This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 642bf00501e Enhance ValueAggregator (#16552)
642bf00501e is described below

commit 642bf00501ef0cc0ddb79ade00b2eff695590ea0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 8 18:45:14 2025 -0600

    Enhance ValueAggregator (#16552)
---
 .../local/aggregator/AvgValueAggregator.java       |  11 +-
 .../local/aggregator/CountValueAggregator.java     |  10 +-
 .../DistinctCountBitmapValueAggregator.java        |   7 +
 .../DistinctCountCPCSketchValueAggregator.java     |  26 +-
 .../DistinctCountHLLPlusValueAggregator.java       |  11 +-
 .../DistinctCountHLLValueAggregator.java           |  11 +-
 .../DistinctCountThetaSketchValueAggregator.java   |   7 +
 .../DistinctCountULLValueAggregator.java           |  11 +-
 .../IntegerTupleSketchValueAggregator.java         |  13 +-
 .../local/aggregator/MaxValueAggregator.java       |  11 +-
 .../aggregator/MinMaxRangeValueAggregator.java     |  11 +-
 .../local/aggregator/MinValueAggregator.java       |  11 +-
 .../aggregator/PercentileEstValueAggregator.java   |   7 +
 .../PercentileTDigestValueAggregator.java          |   7 +
 .../aggregator/SumPrecisionValueAggregator.java    |  11 +-
 .../local/aggregator/SumValueAggregator.java       |  11 +-
 .../segment/local/aggregator/ValueAggregator.java  |  11 +-
 .../local/aggregator/ValueAggregatorFactory.java   |  24 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 117 +++++---
 .../startree/v2/builder/BaseSingleTreeBuilder.java |  17 +-
 .../segment/local/utils/TableConfigUtils.java      |  30 +-
 .../DistinctCountCPCSketchValueAggregatorTest.java |   6 +
 .../DistinctCountULLValueAggregatorTest.java       |   6 +
 .../IntegerTupleSketchValueAggregatorTest.java     |   7 +
 .../local/aggregator/ValueAggregatorTest.java      |  62 ++++
 ...MutableSegmentImplIngestionAggregationTest.java | 322 +++++++--------------
 26 files changed, 462 insertions(+), 316 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
index 7ba199642e4..b999d49ea7e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.customobject.AvgPair;
 import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -38,7 +39,10 @@ public class AvgValueAggregator implements 
ValueAggregator<Object, AvgPair> {
   }
 
   @Override
-  public AvgPair getInitialAggregatedValue(Object rawValue) {
+  public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return new AvgPair();
+    }
     if (rawValue instanceof byte[]) {
       return deserializeAggregatedValue((byte[]) rawValue);
     } else {
@@ -67,6 +71,11 @@ public class AvgValueAggregator implements 
ValueAggregator<Object, AvgPair> {
     return new AvgPair(value.getSum(), value.getCount());
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Double.BYTES + Long.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
index 177578935b7..2b7e01a6575 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -36,8 +37,8 @@ public class CountValueAggregator implements 
ValueAggregator<Object, Long> {
   }
 
   @Override
-  public Long getInitialAggregatedValue(Object rawValue) {
-    return 1L;
+  public Long getInitialAggregatedValue(@Nullable Object rawValue) {
+    return rawValue != null ? 1L : 0L;
   }
 
   @Override
@@ -55,6 +56,11 @@ public class CountValueAggregator implements 
ValueAggregator<Object, Long> {
     return value;
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Long.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
index 5da5ae1963c..64cceab6bb5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
@@ -41,6 +41,8 @@ public class DistinctCountBitmapValueAggregator implements 
ValueAggregator<Objec
 
   @Override
   public RoaringBitmap getInitialAggregatedValue(Object rawValue) {
+    // NOTE: rawValue cannot be null because this aggregator can only be used 
for star-tree index.
+    assert rawValue != null;
     RoaringBitmap initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -77,6 +79,11 @@ public class DistinctCountBitmapValueAggregator implements 
ValueAggregator<Objec
     return value.clone();
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return false;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return _maxByteSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
index 6bc816f50ab..7ff2a636de7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.aggregator;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.datasketches.cpc.CpcUnion;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -54,8 +55,11 @@ public class DistinctCountCPCSketchValueAggregator 
implements ValueAggregator<Ob
   }
 
   @Override
-  public Object getInitialAggregatedValue(Object rawValue) {
+  public Object getInitialAggregatedValue(@Nullable Object rawValue) {
     CpcUnion cpcUnion = new CpcUnion(_lgK);
+    if (rawValue == null) {
+      return cpcUnion;
+    }
     if (rawValue instanceof byte[]) { // Serialized Sketch
       byte[] bytes = (byte[]) rawValue;
       cpcUnion.update(deserializeAggregatedValue(bytes));
@@ -100,6 +104,11 @@ public class DistinctCountCPCSketchValueAggregator 
implements ValueAggregator<Ob
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return CpcSketch.getMaxSerializedBytes(_lgK);
@@ -121,21 +130,6 @@ public class DistinctCountCPCSketchValueAggregator 
implements ValueAggregator<Ob
     return _lgK;
   }
 
-  private CpcSketch union(CpcSketch left, CpcSketch right) {
-    if (left == null && right == null) {
-      return empty();
-    } else if (left == null) {
-      return right;
-    } else if (right == null) {
-      return left;
-    }
-
-    CpcUnion union = new CpcUnion(_lgK);
-    union.update(left);
-    union.update(right);
-    return union.getResult();
-  }
-
   private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
     if (rawValue instanceof String) {
       sketch.update((String) rawValue);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
index a4857a1b8de..6c1a50e5c57 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
@@ -22,6 +22,7 @@ import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.segment.local.utils.HyperLogLogPlusUtils;
@@ -64,7 +65,10 @@ public class DistinctCountHLLPlusValueAggregator implements 
ValueAggregator<Obje
   }
 
   @Override
-  public HyperLogLogPlus getInitialAggregatedValue(Object rawValue) {
+  public HyperLogLogPlus getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return new HyperLogLogPlus(_p, _sp);
+    }
     HyperLogLogPlus initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -107,6 +111,11 @@ public class DistinctCountHLLPlusValueAggregator 
implements ValueAggregator<Obje
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     // NOTE: For aggregated metrics, initial aggregated value might have not 
been generated. Returns the byte size
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
index 0ca535c9d8e..6414231f913 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
@@ -22,6 +22,7 @@ import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.segment.local.utils.HyperLogLogUtils;
@@ -58,7 +59,10 @@ public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object,
   }
 
   @Override
-  public HyperLogLog getInitialAggregatedValue(Object rawValue) {
+  public HyperLogLog getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return new HyperLogLog(_log2m);
+    }
     HyperLogLog initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -101,6 +105,11 @@ public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object,
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     // NOTE: For aggregated metrics, initial aggregated value might have not 
been generated. Returns the byte size
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
index c19bb0a0f17..5dba35d637c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
@@ -116,6 +116,8 @@ public class DistinctCountThetaSketchValueAggregator 
implements ValueAggregator<
 
   @Override
   public Object getInitialAggregatedValue(Object rawValue) {
+    // NOTE: rawValue cannot be null because this aggregator can only be used 
for star-tree index.
+    assert rawValue != null;
     Union thetaUnion = _setOperationBuilder.buildUnion();
     if (rawValue instanceof byte[]) { // Serialized Sketch
       byte[] bytes = (byte[]) rawValue;
@@ -175,6 +177,11 @@ public class DistinctCountThetaSketchValueAggregator 
implements ValueAggregator<
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return false;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return _maxByteSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
index d0447307151..cfc7a91cccf 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.aggregator;
 import com.dynatrace.hash4j.distinctcount.UltraLogLog;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
@@ -54,7 +55,10 @@ public class DistinctCountULLValueAggregator implements 
ValueAggregator<Object,
   }
 
   @Override
-  public UltraLogLog getInitialAggregatedValue(Object rawValue) {
+  public UltraLogLog getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return UltraLogLog.create(_p);
+    }
     UltraLogLog initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -83,6 +87,11 @@ public class DistinctCountULLValueAggregator implements 
ValueAggregator<Object,
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return (1 << _p) + 1;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
index f5294db788d..e5db940dbcd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.aggregator;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.Union;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -60,9 +61,12 @@ public class IntegerTupleSketchValueAggregator implements 
ValueAggregator<byte[]
   }
 
   @Override
-  public Object getInitialAggregatedValue(byte[] rawValue) {
-    Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
+  public Object getInitialAggregatedValue(@Nullable byte[] rawValue) {
     Union tupleUnion = new Union<>(_nominalEntries, new 
IntegerSummarySetOperations(_mode, _mode));
+    if (rawValue == null) {
+      return tupleUnion;
+    }
+    Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
     tupleUnion.union(initialValue);
     return tupleUnion;
   }
@@ -114,6 +118,11 @@ public class IntegerTupleSketchValueAggregator implements 
ValueAggregator<byte[]
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   /**
    * Returns the maximum number of storage bytes required for a Compact 
Integer Tuple Sketch with the given
    * number of actual entries. Note that this assumes the worst case of the 
sketch in
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
index f5b51833fae..d5fcce740a6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -36,7 +37,10 @@ public class MaxValueAggregator implements 
ValueAggregator<Object, Double> {
   }
 
   @Override
-  public Double getInitialAggregatedValue(Object rawValue) {
+  public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return Double.NEGATIVE_INFINITY;
+    }
     return ValueAggregatorUtils.toDouble(rawValue);
   }
 
@@ -55,6 +59,11 @@ public class MaxValueAggregator implements 
ValueAggregator<Object, Double> {
     return value;
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Double.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
index d6aeb1195ea..2f47ba7d280 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -38,7 +39,10 @@ public class MinMaxRangeValueAggregator implements 
ValueAggregator<Object, MinMa
   }
 
   @Override
-  public MinMaxRangePair getInitialAggregatedValue(Object rawValue) {
+  public MinMaxRangePair getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return new MinMaxRangePair();
+    }
     if (rawValue instanceof byte[]) {
       return deserializeAggregatedValue((byte[]) rawValue);
     } else {
@@ -68,6 +72,11 @@ public class MinMaxRangeValueAggregator implements 
ValueAggregator<Object, MinMa
     return new MinMaxRangePair(value.getMin(), value.getMax());
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Double.BYTES + Double.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
index 2ebc95c123c..4c4edf62d65 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -36,7 +37,10 @@ public class MinValueAggregator implements 
ValueAggregator<Object, Double> {
   }
 
   @Override
-  public Double getInitialAggregatedValue(Object rawValue) {
+  public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return Double.POSITIVE_INFINITY;
+    }
     return ValueAggregatorUtils.toDouble(rawValue);
   }
 
@@ -55,6 +59,11 @@ public class MinValueAggregator implements 
ValueAggregator<Object, Double> {
     return value;
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Double.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
index 4915461b87e..739b0a82b76 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
@@ -44,6 +44,8 @@ public class PercentileEstValueAggregator implements 
ValueAggregator<Object, Qua
 
   @Override
   public QuantileDigest getInitialAggregatedValue(Object rawValue) {
+    // NOTE: rawValue cannot be null because this aggregator can only be used 
for star-tree index.
+    assert rawValue != null;
     QuantileDigest initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -92,6 +94,11 @@ public class PercentileEstValueAggregator implements 
ValueAggregator<Object, Qua
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return false;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return _maxByteSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
index 054b52f9bd7..0e1a2caf342 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
@@ -54,6 +54,8 @@ public class PercentileTDigestValueAggregator implements 
ValueAggregator<Object,
 
   @Override
   public TDigest getInitialAggregatedValue(Object rawValue) {
+    // NOTE: rawValue cannot be null because this aggregator can only be used 
for star-tree index.
+    assert rawValue != null;
     TDigest initialValue;
     if (rawValue instanceof byte[]) {
       byte[] bytes = (byte[]) rawValue;
@@ -90,6 +92,11 @@ public class PercentileTDigestValueAggregator implements 
ValueAggregator<Object,
     return deserializeAggregatedValue(serializeAggregatedValue(value));
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return false;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return _maxByteSize;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
index f8074129838..1d97da7b263 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.aggregator;
 import com.google.common.base.Preconditions;
 import java.math.BigDecimal;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -58,7 +59,10 @@ public class SumPrecisionValueAggregator implements 
ValueAggregator<Object, BigD
   }
 
   @Override
-  public BigDecimal getInitialAggregatedValue(Object rawValue) {
+  public BigDecimal getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return BigDecimal.ZERO;
+    }
     BigDecimal initialValue = toBigDecimal(rawValue);
     if (_fixedSize < 0) {
       _maxByteSize = Math.max(_maxByteSize, 
BigDecimalUtils.byteSize(initialValue));
@@ -100,6 +104,11 @@ public class SumPrecisionValueAggregator implements 
ValueAggregator<Object, BigD
     return value;
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return _fixedSize > 0;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     Preconditions.checkState(_fixedSize > 0 || _maxByteSize > 0,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
index 1f79b6cc6c8..524c08c3396 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -36,7 +37,10 @@ public class SumValueAggregator implements 
ValueAggregator<Object, Double> {
   }
 
   @Override
-  public Double getInitialAggregatedValue(Object rawValue) {
+  public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+    if (rawValue == null) {
+      return 0.0;
+    }
     return ValueAggregatorUtils.toDouble(rawValue);
   }
 
@@ -55,6 +59,11 @@ public class SumValueAggregator implements 
ValueAggregator<Object, Double> {
     return value;
   }
 
+  @Override
+  public boolean isAggregatedValueFixedSize() {
+    return true;
+  }
+
   @Override
   public int getMaxAggregatedValueByteSize() {
     return Double.BYTES;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
index 7743fc7c314..20b8c3971cd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -42,8 +43,10 @@ public interface ValueAggregator<R, A> {
 
   /**
    * Returns the initial aggregated value.
+   * <p>NOTE: rawValue can be null when the aggregator is used for ingestion 
aggregation, and the column is not
+   * specified in the schema.
    */
-  A getInitialAggregatedValue(R rawValue);
+  A getInitialAggregatedValue(@Nullable R rawValue);
 
   /**
    * Applies a raw value to the current aggregated value.
@@ -62,6 +65,12 @@ public interface ValueAggregator<R, A> {
    */
   A cloneAggregatedValue(A value);
 
+  /**
+   * Returns whether the aggregated value is of fixed size. Value aggregator 
can be used for ingestion aggregation only
+   * when the aggregated value is of fixed size.
+   */
+  boolean isAggregatedValueFixedSize();
+
   /**
    * Returns the maximum size in bytes of the aggregated values seen so far.
    */
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 2ba883854f9..4ada0134954 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -61,18 +61,15 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTHLL:
       case DISTINCTCOUNTRAWHLL:
         return new DistinctCountHLLValueAggregator(arguments);
-      case PERCENTILEEST:
-      case PERCENTILERAWEST:
-        return new PercentileEstValueAggregator();
-      case PERCENTILETDIGEST:
-      case PERCENTILERAWTDIGEST:
-        return new PercentileTDigestValueAggregator(arguments);
-      case DISTINCTCOUNTTHETASKETCH:
-      case DISTINCTCOUNTRAWTHETASKETCH:
-        return new DistinctCountThetaSketchValueAggregator(arguments);
       case DISTINCTCOUNTHLLPLUS:
       case DISTINCTCOUNTRAWHLLPLUS:
         return new DistinctCountHLLPlusValueAggregator(arguments);
+      case DISTINCTCOUNTULL:
+      case DISTINCTCOUNTRAWULL:
+        return new DistinctCountULLValueAggregator(arguments);
+      case DISTINCTCOUNTTHETASKETCH:
+      case DISTINCTCOUNTRAWTHETASKETCH:
+        return new DistinctCountThetaSketchValueAggregator(arguments);
       case DISTINCTCOUNTTUPLESKETCH:
       case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
       case AVGVALUEINTEGERSUMTUPLESKETCH:
@@ -81,9 +78,12 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTCPCSKETCH:
       case DISTINCTCOUNTRAWCPCSKETCH:
         return new DistinctCountCPCSketchValueAggregator(arguments);
-      case DISTINCTCOUNTULL:
-      case DISTINCTCOUNTRAWULL:
-        return new DistinctCountULLValueAggregator(arguments);
+      case PERCENTILEEST:
+      case PERCENTILERAWEST:
+        return new PercentileEstValueAggregator();
+      case PERCENTILETDIGEST:
+      case PERCENTILERAWTDIGEST:
+        return new PercentileTDigestValueAggregator(arguments);
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index a3a6b31fdd1..9c4ac15a873 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
 import it.unimi.dsi.fastutil.booleans.BooleanList;
@@ -75,7 +76,6 @@ import org.apache.pinot.segment.local.upsert.UpsertContext;
 import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
 import org.apache.pinot.segment.local.utils.IdMap;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -96,6 +96,7 @@ import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
 import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
 import org.apache.pinot.segment.spi.index.reader.MultiColumnTextIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
@@ -480,7 +481,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private static Map<String, Pair<String, ValueAggregator>> 
getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
     if (segmentConfig.aggregateMetrics()) {
       return fromAggregateMetrics(segmentConfig);
-    } else if 
(!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
+    } else if 
(CollectionUtils.isNotEmpty(segmentConfig.getIngestionAggregationConfigs())) {
       return fromAggregationConfig(segmentConfig);
     } else {
       return Collections.emptyMap();
@@ -491,8 +492,10 @@ public class MutableSegmentImpl implements MutableSegment {
     
Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()),
         "aggregateMetrics cannot be enabled if AggregationConfig is set");
 
-    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new 
HashMap<>();
-    for (String metricName : segmentConfig.getSchema().getMetricNames()) {
+    List<String> metricNames = segmentConfig.getSchema().getMetricNames();
+    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator =
+        Maps.newHashMapWithExpectedSize(metricNames.size());
+    for (String metricName : metricNames) {
       columnNameToAggregator.put(metricName, Pair.of(metricName,
           
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, 
Collections.emptyList())));
     }
@@ -500,11 +503,11 @@ public class MutableSegmentImpl implements MutableSegment 
{
   }
 
   private static Map<String, Pair<String, ValueAggregator>> 
fromAggregationConfig(RealtimeSegmentConfig segmentConfig) {
-    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new 
HashMap<>();
-
-    Preconditions.checkState(!segmentConfig.aggregateMetrics(),
-        "aggregateMetrics cannot be enabled if AggregationConfig is set");
-    for (AggregationConfig config : 
segmentConfig.getIngestionAggregationConfigs()) {
+    List<AggregationConfig> aggregationConfigs = 
segmentConfig.getIngestionAggregationConfigs();
+    assert !segmentConfig.aggregateMetrics() && 
CollectionUtils.isNotEmpty(aggregationConfigs);
+    Map<String, Pair<String, ValueAggregator>> columnNameToAggregator =
+        Maps.newHashMapWithExpectedSize(aggregationConfigs.size());
+    for (AggregationConfig config : aggregationConfigs) {
       ExpressionContext expressionContext = 
RequestContextUtils.getExpression(config.getAggregationFunction());
       // validation is also done when the table is created, this is just a 
sanity check.
       Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
@@ -512,14 +515,16 @@ public class MutableSegmentImpl implements MutableSegment 
{
       FunctionContext functionContext = expressionContext.getFunction();
       AggregationFunctionType functionType =
           
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
-      TableConfigUtils.validateIngestionAggregation(functionType);
-      ExpressionContext argument = functionContext.getArguments().get(0);
+      List<ExpressionContext> arguments = functionContext.getArguments();
+      ExpressionContext argument = arguments.get(0);
       Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
           "aggregator function argument must be a identifier: %s", config);
+      ValueAggregator valueAggregator =
+          ValueAggregatorFactory.getValueAggregator(functionType, 
arguments.subList(1, arguments.size()));
+      Preconditions.checkState(valueAggregator.isAggregatedValueFixedSize(),
+          "aggregator function must have fixed size aggregated value: %s", 
config);
 
-      columnNameToAggregator.put(config.getColumnName(), 
Pair.of(argument.getIdentifier(),
-          ValueAggregatorFactory.getValueAggregator(functionType,
-              functionContext.getArguments().subList(1, 
functionContext.getArguments().size()))));
+      columnNameToAggregator.put(config.getColumnName(), 
Pair.of(argument.getIdentifier(), valueAggregator));
     }
 
     return columnNameToAggregator;
@@ -811,9 +816,17 @@ public class MutableSegmentImpl implements MutableSegment {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
 
-      // aggregate metrics is enabled.
-      if (indexContainer._valueAggregator != null) {
-        Object value = row.getValue(indexContainer._sourceColumn);
+      // Handle ingestion aggregation
+      ValueAggregator valueAggregator = indexContainer._valueAggregator;
+      if (valueAggregator != null) {
+        String sourceColumn = indexContainer._sourceColumn;
+        // NOTE: value can be null if the column is not specified in the 
schema.
+        Object value = row.getValue(sourceColumn);
+        // Handle COUNT(*)
+        if (value == null && 
sourceColumn.equals(AggregationFunctionColumnPair.STAR)) {
+          assert valueAggregator.getAggregationType() == 
AggregationFunctionType.COUNT;
+          value = 1;
+        }
 
         // Update numValues info
         indexContainer._valuesInfo.updateSVNumValues();
@@ -822,7 +835,7 @@ public class MutableSegmentImpl implements MutableSegment {
         FieldSpec fieldSpec = indexContainer._fieldSpec;
 
         DataType dataType = fieldSpec.getDataType();
-        value = 
indexContainer._valueAggregator.getInitialAggregatedValue(value);
+        value = valueAggregator.getInitialAggregatedValue(value);
         // BIG_DECIMAL is actually stored as byte[] and hence can be supported 
here.
         switch (dataType.getStoredType()) {
           case INT:
@@ -839,7 +852,7 @@ public class MutableSegmentImpl implements MutableSegment {
             break;
           case BIG_DECIMAL:
           case BYTES:
-            
forwardIndex.add(indexContainer._valueAggregator.serializeAggregatedValue(value),
 -1, docId);
+            forwardIndex.add(valueAggregator.serializeAggregatedValue(value), 
-1, docId);
             break;
           default:
             throw new UnsupportedOperationException(
@@ -926,7 +939,7 @@ public class MutableSegmentImpl implements MutableSegment {
         if (_multiColumnValues != null) {
           int pos = _multiColumnPos.getInt(column);
           if (pos > -1) {
-            _multiColumnValues.set(pos, (String) value);
+            _multiColumnValues.set(pos, value);
           }
         }
       } else {
@@ -1011,37 +1024,47 @@ public class MutableSegmentImpl implements 
MutableSegment {
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
       IndexContainer indexContainer = 
_indexContainerMap.get(metricFieldSpec.getName());
-      Object value = row.getValue(indexContainer._sourceColumn);
+      ValueAggregator valueAggregator = indexContainer._valueAggregator;
+      String sourceColumn = indexContainer._sourceColumn;
+      // NOTE: value can be null if the column is not specified in the schema.
+      Object value = row.getValue(sourceColumn);
+      // Skip aggregation if the input value is null.
+      if (value == null) {
+        // Handle COUNT(*)
+        if (sourceColumn.equals(AggregationFunctionColumnPair.STAR)) {
+          assert valueAggregator.getAggregationType() == 
AggregationFunctionType.COUNT;
+          value = 1;
+        } else {
+          continue;
+        }
+      }
       MutableForwardIndex forwardIndex =
           (MutableForwardIndex) 
indexContainer._mutableIndexes.get(StandardIndexes.forward());
       DataType dataType = metricFieldSpec.getDataType();
 
-      Double oldDoubleValue;
-      Double newDoubleValue;
-      Long oldLongValue;
-      Long newLongValue;
-      ValueAggregator valueAggregator = indexContainer._valueAggregator;
       switch (valueAggregator.getAggregatedValueType()) {
         case DOUBLE:
+          double oldDoubleValue;
+          double newDoubleValue;
           switch (dataType) {
             case INT:
-              oldDoubleValue = ((Integer) 
forwardIndex.getInt(docId)).doubleValue();
-              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
-              forwardIndex.setInt(docId, newDoubleValue.intValue());
+              oldDoubleValue = forwardIndex.getInt(docId);
+              newDoubleValue = (double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setInt(docId, (int) newDoubleValue);
               break;
             case LONG:
-              oldDoubleValue = ((Long) 
forwardIndex.getLong(docId)).doubleValue();
-              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
-              forwardIndex.setLong(docId, newDoubleValue.longValue());
+              oldDoubleValue = forwardIndex.getLong(docId);
+              newDoubleValue = (double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setLong(docId, (long) newDoubleValue);
               break;
             case FLOAT:
-              oldDoubleValue = ((Float) 
forwardIndex.getFloat(docId)).doubleValue();
-              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
-              forwardIndex.setFloat(docId, newDoubleValue.floatValue());
+              oldDoubleValue = forwardIndex.getFloat(docId);
+              newDoubleValue = (double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              forwardIndex.setFloat(docId, (float) newDoubleValue);
               break;
             case DOUBLE:
               oldDoubleValue = forwardIndex.getDouble(docId);
-              newDoubleValue = (Double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
+              newDoubleValue = (double) 
valueAggregator.applyRawValue(oldDoubleValue, value);
               forwardIndex.setDouble(docId, newDoubleValue);
               break;
             default:
@@ -1050,26 +1073,28 @@ public class MutableSegmentImpl implements 
MutableSegment {
           }
           break;
         case LONG:
+          long oldLongValue;
+          long newLongValue;
           switch (dataType) {
             case INT:
-              oldLongValue = ((Integer) 
forwardIndex.getInt(docId)).longValue();
-              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
-              forwardIndex.setInt(docId, newLongValue.intValue());
+              oldLongValue = forwardIndex.getInt(docId);
+              newLongValue = (long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setInt(docId, (int) newLongValue);
               break;
             case LONG:
               oldLongValue = forwardIndex.getLong(docId);
-              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              newLongValue = (long) 
valueAggregator.applyRawValue(oldLongValue, value);
               forwardIndex.setLong(docId, newLongValue);
               break;
             case FLOAT:
-              oldLongValue = ((Float) 
forwardIndex.getFloat(docId)).longValue();
-              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
-              forwardIndex.setFloat(docId, newLongValue.floatValue());
+              oldLongValue = (long) forwardIndex.getFloat(docId);
+              newLongValue = (long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setFloat(docId, (float) newLongValue);
               break;
             case DOUBLE:
-              oldLongValue = ((Double) 
forwardIndex.getDouble(docId)).longValue();
-              newLongValue = (Long) 
valueAggregator.applyRawValue(oldLongValue, value);
-              forwardIndex.setDouble(docId, newLongValue.doubleValue());
+              oldLongValue = (long) forwardIndex.getDouble(docId);
+              newLongValue = (long) 
valueAggregator.applyRawValue(oldLongValue, value);
+              forwardIndex.setDouble(docId, (double) newLongValue);
               break;
             default:
               throw new 
UnsupportedOperationException(String.format("Aggregation type %s of %s not 
supported for %s",
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
index f797c077a86..517bcd4a272 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
@@ -263,13 +263,24 @@ abstract class BaseSingleTreeBuilder implements 
SingleTreeBuilder {
       int[] dimensions = Arrays.copyOf(segmentRecord._dimensions, 
_numDimensions);
       Object[] metrics = new Object[_numMetrics];
       for (int i = 0; i < _numMetrics; i++) {
-        metrics[i] = 
_valueAggregators[i].getInitialAggregatedValue(segmentRecord._metrics[i]);
+        Object rawValue = segmentRecord._metrics[i];
+        if (rawValue != null) {
+          metrics[i] = 
_valueAggregators[i].getInitialAggregatedValue(rawValue);
+        } else {
+          assert _valueAggregators[i].getAggregationType() == 
AggregationFunctionType.COUNT;
+          metrics[i] = 1L;
+        }
       }
       return new Record(dimensions, metrics);
     } else {
       for (int i = 0; i < _numMetrics; i++) {
-        aggregatedRecord._metrics[i] =
-            _valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i], 
segmentRecord._metrics[i]);
+        Object rawValue = segmentRecord._metrics[i];
+        if (rawValue != null) {
+          aggregatedRecord._metrics[i] = 
_valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i], rawValue);
+        } else {
+          assert _valueAggregators[i].getAggregationType() == 
AggregationFunctionType.COUNT;
+          aggregatedRecord._metrics[i] = ((long) aggregatedRecord._metrics[i]) 
+ 1;
+        }
       }
       return aggregatedRecord;
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index cd4833dd044..c1fdb7c690b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -44,6 +43,8 @@ import 
org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import 
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
@@ -105,7 +106,9 @@ import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+import static 
org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLL;
+import static 
org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLLPLUS;
+import static 
org.apache.pinot.segment.spi.AggregationFunctionType.SUMPRECISION;
 
 
 /**
@@ -124,8 +127,6 @@ public final class TableConfigUtils {
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
-  private static final EnumSet<AggregationFunctionType> 
SUPPORTED_INGESTION_AGGREGATIONS =
-      EnumSet.of(SUM, MIN, MAX, COUNT, DISTINCTCOUNTHLL, SUMPRECISION, 
DISTINCTCOUNTHLLPLUS);
 
   private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
       
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
@@ -418,7 +419,7 @@ public final class TableConfigUtils {
       // Aggregation configs
       List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
       Set<String> aggregationSourceColumns = new HashSet<>();
-      if (!CollectionUtils.isEmpty(aggregationConfigs)) {
+      if (CollectionUtils.isNotEmpty(aggregationConfigs)) {
         
Preconditions.checkState(!tableConfig.getIndexingConfig().isAggregateMetrics(),
             "aggregateMetrics cannot be set with AggregationConfig");
         Set<String> aggregationColumns = new HashSet<>();
@@ -453,8 +454,6 @@ public final class TableConfigUtils {
           FunctionContext functionContext = expressionContext.getFunction();
           AggregationFunctionType functionType =
               
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
-          validateIngestionAggregation(functionType);
-
           List<ExpressionContext> arguments = functionContext.getArguments();
           int numArguments = arguments.size();
           if (functionType == DISTINCTCOUNTHLL) {
@@ -510,6 +509,18 @@ public final class TableConfigUtils {
           Preconditions.checkState(firstArgument.getType() == 
ExpressionContext.Type.IDENTIFIER,
               "First argument of aggregation function: %s must be identifier, 
got: %s", functionType,
               firstArgument.getType());
+          // Create a ValueAggregator for the aggregation function and check 
if it is supported for ingestion (fixed
+          // size aggregated value).
+          ValueAggregator<?, ?> valueAggregator;
+          try {
+            valueAggregator =
+                ValueAggregatorFactory.getValueAggregator(functionType, 
arguments.subList(1, numArguments));
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Caught exception while creating ValueAggregator for 
aggregation function: " + aggregationFunction, e);
+          }
+          
Preconditions.checkState(valueAggregator.isAggregatedValueFixedSize(),
+              "Aggregation function: %s must have fixed size aggregated 
value", aggregationFunction);
 
           aggregationSourceColumns.add(firstArgument.getIdentifier());
         }
@@ -601,11 +612,6 @@ public final class TableConfigUtils {
     }
   }
 
-  public static void validateIngestionAggregation(AggregationFunctionType 
functionType) {
-    
Preconditions.checkState(SUPPORTED_INGESTION_AGGREGATIONS.contains(functionType),
-        "Aggregation function: %s must be one of: %s", functionType, 
SUPPORTED_INGESTION_AGGREGATIONS);
-  }
-
   private static void validateStreamConfigMaps(TableConfig tableConfig) {
     List<Map<String, String>> streamConfigMaps = 
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
     int numStreamConfigs = streamConfigMaps.size();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
index c9bc80f8264..25e6588b412 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
@@ -59,6 +59,12 @@ public class DistinctCountCPCSketchValueAggregatorTest {
     
assertEquals(Math.round(toSketch(agg.getInitialAggregatedValue(bytes)).getEstimate()),
 2);
   }
 
+  @Test
+  public void nullInitialShouldReturnEmptySketch() {
+    DistinctCountCPCSketchValueAggregator agg = new 
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+    assertEquals(toSketch(agg.getInitialAggregatedValue(null)).getEstimate(), 
0.0);
+  }
+
   @Test
   public void applyAggregatedValueShouldUnion() {
     CpcSketch input1 = new CpcSketch();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
index f7b5fc14dc9..78ba4dacbe3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
@@ -80,6 +80,12 @@ public class DistinctCountULLValueAggregatorTest {
     assertEquals(aggregated.getP(), 12);
   }
 
+  @Test
+  public void nullInitialShouldReturnEmptyULL() {
+    DistinctCountULLValueAggregator agg = new 
DistinctCountULLValueAggregator(Collections.emptyList());
+    
assertEquals(agg.getInitialAggregatedValue(null).getDistinctCountEstimate(), 
0.0);
+  }
+
   @Test
   public void applyAggregatedValueShouldUnion() {
     UltraLogLog input1 = UltraLogLog.create(12);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
index bc95c382c87..c6dbeb6d7f4 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
@@ -43,6 +43,13 @@ public class IntegerTupleSketchValueAggregatorTest {
     
assertEquals(toSketch(agg.getInitialAggregatedValue(sketchContaining("hello 
world", 1))).getEstimate(), 1.0);
   }
 
+  @Test
+  public void nullInitialShouldReturnEmptySketch() {
+    IntegerTupleSketchValueAggregator agg =
+        new IntegerTupleSketchValueAggregator(Collections.emptyList(), 
IntegerSummary.Mode.Sum);
+    assertEquals(toSketch(agg.getInitialAggregatedValue(null)).getEstimate(), 
0.0);
+  }
+
   @Test
   public void applyAggregatedValueShouldUnion() {
     IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
new file mode 100644
index 00000000000..d983ce259c7
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.List;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ValueAggregatorTest {
+
+  @Test(dataProvider = "fixedSizeAggregatedValue")
+  public void testFixedSizeAggregatedValue(AggregationFunctionType 
functionType, List<ExpressionContext> arguments,
+      boolean expected) {
+    assertEquals(ValueAggregatorFactory.getValueAggregator(functionType, 
arguments).isAggregatedValueFixedSize(),
+        expected);
+  }
+
+  @DataProvider
+  public static Object[][] fixedSizeAggregatedValue() {
+    return new Object[][]{
+        {AggregationFunctionType.COUNT, List.of(), true},
+        {AggregationFunctionType.MIN, List.of(), true},
+        {AggregationFunctionType.MAX, List.of(), true},
+        {AggregationFunctionType.SUM, List.of(), true},
+        {AggregationFunctionType.SUMPRECISION, List.of(), false},
+        {AggregationFunctionType.SUMPRECISION, 
List.of(ExpressionContext.forLiteral(Literal.intValue(20))), true},
+        {AggregationFunctionType.AVG, List.of(), true},
+        {AggregationFunctionType.MINMAXRANGE, List.of(), true},
+        {AggregationFunctionType.DISTINCTCOUNTBITMAP, List.of(), false},
+        {AggregationFunctionType.DISTINCTCOUNTHLL, List.of(), true},
+        {AggregationFunctionType.DISTINCTCOUNTHLLPLUS, List.of(), true},
+        {AggregationFunctionType.DISTINCTCOUNTULL, List.of(), true},
+        {AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, List.of(), false},
+        {AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH, List.of(), true},
+        {AggregationFunctionType.DISTINCTCOUNTCPCSKETCH, List.of(), true},
+        {AggregationFunctionType.PERCENTILEEST, List.of(), false},
+        {AggregationFunctionType.PERCENTILETDIGEST, List.of(), false}
+    };
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
index eb8fbc30d14..eab8784a9e0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
@@ -18,31 +18,29 @@
  */
 package org.apache.pinot.segment.local.indexsegment.mutable;
 
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import org.apache.pinot.common.request.Literal;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import 
org.apache.pinot.segment.local.aggregator.DistinctCountHLLValueAggregator;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 
 
 public class MutableSegmentImplIngestionAggregationTest {
@@ -61,18 +59,17 @@ public class MutableSegmentImplIngestionAggregationTest {
 
   private static Schema.SchemaBuilder getSchemaBuilder() {
     return new Schema.SchemaBuilder().setSchemaName("testSchema")
-        .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
-        .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING)
-        .addDateTime(TIME_COLUMN1, FieldSpec.DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS")
-        .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH", 
"1:HOURS");
+        .addSingleValueDimension(DIMENSION_1, DataType.INT)
+        .addSingleValueDimension(DIMENSION_2, DataType.STRING)
+        .addDateTime(TIME_COLUMN1, DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+        .addDateTime(TIME_COLUMN2, DataType.INT, "1:HOURS:EPOCH", "1:HOURS");
   }
 
-  private static final Set<String> VAR_LENGTH_SET = 
Collections.singleton(DIMENSION_2);
-  private static final Set<String> INVERTED_INDEX_SET =
-      new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1, 
TIME_COLUMN2));
+  private static final Set<String> VAR_LENGTH_SET = Set.of(DIMENSION_2);
+  private static final Set<String> INVERTED_INDEX_SET = Set.of(DIMENSION_1, 
DIMENSION_2, TIME_COLUMN1, TIME_COLUMN2);
 
   private static final List<String> STRING_VALUES =
-      Collections.unmodifiableList(Arrays.asList("aa", "bbb", "cc", "ddd", 
"ee", "fff", "gg", "hhh", "ii", "jjj"));
+      List.of("aa", "bbb", "cc", "ddd", "ee", "fff", "gg", "hhh", "ii", "jjj");
 
   @Test
   public void testSameSrcDifferentAggregations()
@@ -80,12 +77,10 @@ public class MutableSegmentImplIngestionAggregationTest {
     String m1 = "metric_MAX";
     String m2 = "metric_MIN";
 
-    Schema schema =
-        getSchemaBuilder().addMetric(m2, 
FieldSpec.DataType.DOUBLE).addMetric(m1, FieldSpec.DataType.DOUBLE).build();
+    Schema schema = getSchemaBuilder().addMetric(m2, 
DataType.DOUBLE).addMetric(m1, DataType.DOUBLE).build();
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m2, m1)),
-            VAR_LENGTH_SET, INVERTED_INDEX_SET,
-            Arrays.asList(new AggregationConfig(m1, "MAX(metric)"), new 
AggregationConfig(m2, "MIN(metric)")));
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "MAX(metric)"), new 
AggregationConfig(m2, "MIN(metric)")));
 
     Map<String, Double> expectedMin = new HashMap<>();
     Map<String, Double> expectedMax = new HashMap<>();
@@ -98,12 +93,13 @@ public class MutableSegmentImplIngestionAggregationTest {
               (Integer) metrics.get(0).getValue()));
     }
 
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
     GenericRow reuse = new GenericRow();
-    for (int docId = 0; docId < expectedMax.size(); docId++) {
+    for (int docId = 0; docId < numDocsIndexed; docId++) {
       GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
       String key = buildKey(row);
-      Assert.assertEquals(row.getValue(m2), expectedMin.get(key), key);
-      Assert.assertEquals(row.getValue(m1), expectedMax.get(key), key);
+      assertEquals(row.getValue(m2), expectedMin.get(key), key);
+      assertEquals(row.getValue(m1), expectedMax.get(key), key);
     }
 
     mutableSegmentImpl.destroy();
@@ -115,12 +111,10 @@ public class MutableSegmentImplIngestionAggregationTest {
     String m1 = "sum1";
     String m2 = "sum2";
 
-    Schema schema =
-        getSchemaBuilder().addMetric(m1, FieldSpec.DataType.INT).addMetric(m2, 
FieldSpec.DataType.LONG).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, 
DataType.INT).addMetric(m2, DataType.LONG).build();
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m2, m1)),
-            VAR_LENGTH_SET, INVERTED_INDEX_SET,
-            Arrays.asList(new AggregationConfig(m1, "SUM(metric)"), new 
AggregationConfig(m2, "SUM(metric_2)")));
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "SUM(metric)"), new 
AggregationConfig(m2, "SUM(metric_2)")));
 
     Map<String, Integer> expectedSum1 = new HashMap<>();
     Map<String, Long> expectedSum2 = new HashMap<>();
@@ -131,38 +125,44 @@ public class MutableSegmentImplIngestionAggregationTest {
           expectedSum2.getOrDefault(metrics.get(1).getKey(), 0L) + ((Integer) 
metrics.get(1).getValue()).longValue());
     }
 
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
     GenericRow reuse = new GenericRow();
-    for (int docId = 0; docId < expectedSum1.size(); docId++) {
+    for (int docId = 0; docId < numDocsIndexed; docId++) {
       GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
       String key = buildKey(row);
-      Assert.assertEquals(row.getValue(m1), expectedSum1.get(key), key);
-      Assert.assertEquals(row.getValue(m2), expectedSum2.get(key), key);
+      assertEquals(row.getValue(m1), expectedSum1.get(key), key);
+      assertEquals(row.getValue(m2), expectedSum2.get(key), key);
     }
 
     mutableSegmentImpl.destroy();
   }
 
   @Test
-  public void testValuesAreNullThrowsException()
+  public void testNullValues()
       throws Exception {
     String m1 = "sum1";
 
-    Schema schema = getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.INT).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, DataType.INT).build();
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Collections.singleton(m1), VAR_LENGTH_SET,
-            INVERTED_INDEX_SET, Collections.singletonList(new 
AggregationConfig(m1, "SUM(metric)")));
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "SUM(metric)")));
 
     long seed = 2;
     Random random = new Random(seed);
 
-    // Generate random int to prevent overflow
-    GenericRow row = getRow(random, 1);
-    row.putValue(METRIC, null);
-    try {
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = getRow(random, 1);
       mutableSegmentImpl.index(row, METADATA);
-      Assert.fail();
-    } catch (NullPointerException e) {
-      // expected
+    }
+
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+    assertTrue(numDocsIndexed < NUM_ROWS);
+
+    GenericRow reuse = new GenericRow();
+    for (int i = 0; i < numDocsIndexed; i++) {
+      GenericRow row = mutableSegmentImpl.getRecord(i, reuse);
+      String key = buildKey(row);
+      assertEquals(row.getValue(m1), 0, key);
     }
 
     mutableSegmentImpl.destroy();
@@ -173,58 +173,25 @@ public class MutableSegmentImplIngestionAggregationTest {
       throws Exception {
     String m1 = "hll1";
 
-    Schema schema = getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.BYTES).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, DataType.BYTES).build();
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Collections.singleton(m1), VAR_LENGTH_SET,
-            INVERTED_INDEX_SET, Collections.singletonList(new 
AggregationConfig(m1, "distinctCountHLL(metric, 12)")));
-
-    Map<String, HLLTestData> expected = new HashMap<>();
-    List<Metric> metrics = addRowsDistinctCountHLL(998, mutableSegmentImpl);
-    for (Metric metric : metrics) {
-      expected.put(metric.getKey(), (HLLTestData) metric.getValue());
-    }
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "distinctCountHLL(metric, 
12)")));
 
-    List<ExpressionContext> arguments = 
List.of(ExpressionContext.forLiteral(Literal.stringValue("12")));
-    DistinctCountHLLValueAggregator valueAggregator = new 
DistinctCountHLLValueAggregator(arguments);
+    Map<String, HyperLogLog> expectedValues = addRowsDistinctCountHLL(998, 
mutableSegmentImpl);
 
-    Set<Integer> integers = new HashSet<>();
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+    assertTrue(numDocsIndexed < NUM_ROWS);
+    assertEquals(numDocsIndexed, expectedValues.size());
 
-    // Assert that the distinct count is within an error margin. We assert on 
the cardinality of the HLL in the docID
-    // and the HLL we made, but also on the cardinality of the HLL in the 
docID and the actual cardinality from the set
-    // of integers.
     GenericRow reuse = new GenericRow();
-    for (int docId = 0; docId < expected.size(); docId++) {
+    for (int docId = 0; docId < numDocsIndexed; docId++) {
       GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
-      String key = buildKey(row);
-
-      integers.addAll(expected.get(key)._integers);
-
-      HyperLogLog expectedHLL = expected.get(key)._hll;
-      HyperLogLog actualHLL = 
valueAggregator.deserializeAggregatedValue((byte[]) row.getValue(m1));
-
-      Assert.assertEquals(actualHLL.cardinality(), expectedHLL.cardinality(), 
(int) (expectedHLL.cardinality() * 0.04),
-          "The HLL cardinality from the index is within a tolerable error 
margin (4%) of the cardinality of the "
-              + "expected HLL.");
-      Assert.assertEquals(actualHLL.cardinality(), 
expected.get(key)._integers.size(),
-          expected.get(key)._integers.size() * 0.04,
-          "The HLL cardinality from the index is within a tolerable error 
margin (4%) of the actual cardinality of "
-              + "the integers.");
+      HyperLogLog actualHLL = 
CustomSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) row.getValue(m1));
+      HyperLogLog expectedHLL = expectedValues.get(buildKey(row));
+      assertEquals(actualHLL.cardinality(), expectedHLL.cardinality());
     }
 
-    // Assert that the aggregated HyperLogLog is also within the error margin
-    HyperLogLog togetherHLL = new HyperLogLog(12);
-    expected.forEach((key, value) -> {
-      try {
-        togetherHLL.addAll(value._hll);
-      } catch (CardinalityMergeException e) {
-        e.printStackTrace();
-        throw new RuntimeException(e);
-      }
-    });
-
-    Assert.assertEquals(togetherHLL.cardinality(), integers.size(), (int) 
(integers.size() * 0.04),
-        "The aggregated HLL cardinality is within a tolerable error margin 
(4%) of the actual cardinality of the "
-            + "integers.");
     mutableSegmentImpl.destroy();
   }
 
@@ -234,24 +201,23 @@ public class MutableSegmentImplIngestionAggregationTest {
     String m1 = "count1";
     String m2 = "count2";
 
-    Schema schema =
-        getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.LONG).addMetric(m2, FieldSpec.DataType.LONG).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, 
DataType.LONG).addMetric(m2, DataType.LONG).build();
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new 
HashSet<>(Arrays.asList(m1, m2)),
-            VAR_LENGTH_SET, INVERTED_INDEX_SET,
-            Arrays.asList(new AggregationConfig(m1, "COUNT(metric)"), new 
AggregationConfig(m2, "COUNT(*)")));
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "COUNT(metric)"), new 
AggregationConfig(m2, "COUNT(*)")));
 
     Map<String, Long> expectedCount = new HashMap<>();
     for (List<Metric> metrics : addRows(3, mutableSegmentImpl)) {
       expectedCount.put(metrics.get(0).getKey(), 
expectedCount.getOrDefault(metrics.get(0).getKey(), 0L) + 1L);
     }
 
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
     GenericRow reuse = new GenericRow();
-    for (int docId = 0; docId < expectedCount.size(); docId++) {
+    for (int docId = 0; docId < numDocsIndexed; docId++) {
       GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
       String key = buildKey(row);
-      Assert.assertEquals(row.getValue(m1), expectedCount.get(key), key);
-      Assert.assertEquals(row.getValue(m2), expectedCount.get(key), key);
+      assertEquals(row.getValue(m1), expectedCount.get(key), key);
+      assertEquals(row.getValue(m2), expectedCount.get(key), key);
     }
 
     mutableSegmentImpl.destroy();
@@ -262,7 +228,7 @@ public class MutableSegmentImplIngestionAggregationTest {
         TIME_COLUMN1) + KEY_SEPARATOR + row.getValue(TIME_COLUMN2);
   }
 
-  private GenericRow getRow(Random random, Integer multiplicationFactor) {
+  private GenericRow getRow(Random random, int multiplicationFactor) {
     GenericRow row = new GenericRow();
 
     row.putValue(DIMENSION_1, random.nextInt(2 * multiplicationFactor));
@@ -273,126 +239,66 @@ public class MutableSegmentImplIngestionAggregationTest {
     return row;
   }
 
-  private class HLLTestData {
-    private HyperLogLog _hll;
-    private Set<Integer> _integers;
-
-    public HLLTestData(HyperLogLog hll, Set<Integer> integers) {
-      _hll = hll;
-      _integers = integers;
-    }
-
-    public HyperLogLog getHll() {
-      return _hll;
-    }
-
-    public Set<Integer> getIntegers() {
-      return _integers;
-    }
-  }
-
-  private class Metric {
-    private final String _key;
-    private final Object _value;
+  private static class Metric {
+    final String _key;
+    final Object _value;
 
     Metric(String key, Object value) {
       _key = key;
       _value = value;
     }
 
-    public String getKey() {
+    String getKey() {
       return _key;
     }
 
-    public Object getValue() {
+    Object getValue() {
       return _value;
     }
   }
 
-  private List<Metric> addRowsDistinctCountHLL(long seed, MutableSegmentImpl 
mutableSegmentImpl)
+  private Map<String, HyperLogLog> addRowsDistinctCountHLL(long seed, 
MutableSegmentImpl mutableSegmentImpl)
       throws Exception {
-    List<Metric> metrics = new ArrayList<>();
+    Map<String, HyperLogLog> valueMap = new HashMap<>();
 
     Random random = new Random(seed);
-
-    HashMap<String, HyperLogLog> hllMap = new HashMap<>();
-    HashMap<String, Set<Integer>> distinctMap = new HashMap<>();
-
-    Integer rows = 500000;
-
-    for (int i = 0; i < (rows); i++) {
+    for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = getRow(random, 1);
       String key = buildKey(row);
 
       int metricValue = random.nextInt(5000000);
       row.putValue(METRIC, metricValue);
-
-      if (hllMap.containsKey(key)) {
-        hllMap.get(key).offer(row.getValue(METRIC));
-        distinctMap.get(key).add(metricValue);
-      } else {
-        HyperLogLog hll = new HyperLogLog(12);
-        hll.offer(row.getValue(METRIC));
-        hllMap.put(key, hll);
-        distinctMap.put(key, new HashSet<>(metricValue));
-      }
-
       mutableSegmentImpl.index(row, METADATA);
-    }
 
-    distinctMap.forEach(
-        (key, value) -> metrics.add(new Metric(key, new 
HLLTestData(hllMap.get(key), distinctMap.get(key)))));
-
-    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
-    Assert.assertEquals(numDocsIndexed, hllMap.keySet().size());
-
-    // Assert that aggregation happened.
-    Assert.assertTrue(numDocsIndexed < NUM_ROWS);
+      valueMap.computeIfAbsent(key, k -> new 
HyperLogLog(12)).offer(metricValue);
+    }
 
-    return metrics;
+    return valueMap;
   }
 
-  private List<Metric> addRowsSumPrecision(long seed, MutableSegmentImpl 
mutableSegmentImpl)
+  private Map<String, BigDecimal> addRowsSumPrecision(long seed, 
MutableSegmentImpl mutableSegmentImpl)
       throws Exception {
-    List<Metric> metrics = new ArrayList<>();
+    Map<String, BigDecimal> valueMap = new HashMap<>();
 
     Random random = new Random(seed);
-
-    HashMap<String, BigDecimal> bdMap = new HashMap<>();
-    HashMap<String, ArrayList<BigDecimal>> bdIndividualMap = new HashMap<>();
-
-    int numRows = 50000;
-    for (int i = 0; i < numRows; i++) {
+    for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = getRow(random, 1);
       String key = buildKey(row);
 
       BigDecimal metricValue = generateRandomBigDecimal(random, 5, 6);
-      row.putValue(METRIC, metricValue.toString());
-
-      if (bdMap.containsKey(key)) {
-        bdMap.put(key, bdMap.get(key).add(metricValue));
-        bdIndividualMap.get(key).add(metricValue);
-      } else {
-        bdMap.put(key, metricValue);
-        ArrayList<BigDecimal> bdList = new ArrayList<>();
-        bdList.add(metricValue);
-        bdIndividualMap.put(key, bdList);
-      }
-
+      row.putValue(METRIC, metricValue);
       mutableSegmentImpl.index(row, METADATA);
-    }
 
-    for (String key : bdMap.keySet()) {
-      metrics.add(new Metric(key, bdMap.get(key)));
+      valueMap.compute(key, (k, v) -> {
+        if (v == null) {
+          return metricValue;
+        } else {
+          return v.add(metricValue);
+        }
+      });
     }
 
-    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
-    Assert.assertEquals(numDocsIndexed, bdMap.keySet().size());
-
-    // Assert that aggregation happened.
-    Assert.assertTrue(numDocsIndexed < NUM_ROWS);
-
-    return metrics;
+    return valueMap;
   }
 
   private List<List<Metric>> addRows(long seed, MutableSegmentImpl 
mutableSegmentImpl)
@@ -403,7 +309,6 @@ public class MutableSegmentImplIngestionAggregationTest {
     Random random = new Random(seed);
 
     for (int i = 0; i < NUM_ROWS; i++) {
-      // Generate random int to prevent overflow
       GenericRow row = getRow(random, 1);
       Integer metricValue = random.nextInt(10000);
       Integer metric2Value = random.nextInt();
@@ -413,15 +318,15 @@ public class MutableSegmentImplIngestionAggregationTest {
       mutableSegmentImpl.index(row, METADATA);
 
       String key = buildKey(row);
-      metrics.add(Arrays.asList(new Metric(key, metricValue), new Metric(key, 
metric2Value)));
+      metrics.add(List.of(new Metric(key, metricValue), new Metric(key, 
metric2Value)));
       keys.add(key);
     }
 
     int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
-    Assert.assertEquals(numDocsIndexed, keys.size());
+    assertEquals(numDocsIndexed, keys.size());
 
     // Assert that aggregation happened.
-    Assert.assertTrue(numDocsIndexed < NUM_ROWS);
+    assertTrue(numDocsIndexed < NUM_ROWS);
 
     return metrics;
   }
@@ -430,70 +335,61 @@ public class MutableSegmentImplIngestionAggregationTest {
   public void testSumPrecision()
       throws Exception {
     String m1 = "sumPrecision1";
-    Schema schema = getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.BIG_DECIMAL).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, 
DataType.BIG_DECIMAL).build();
 
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Collections.singleton(m1), VAR_LENGTH_SET,
-            INVERTED_INDEX_SET,
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
             // Setting precision to 38 in the arguments for SUM_PRECISION
-            Collections.singletonList(new AggregationConfig(m1, 
"SUM_PRECISION(metric, 38)")));
+            List.of(new AggregationConfig(m1, "SUM_PRECISION(metric, 38)")));
 
-    Map<String, BigDecimal> expected = new HashMap<>();
-    List<Metric> metrics = addRowsSumPrecision(998, mutableSegmentImpl);
-    for (Metric metric : metrics) {
-      expected.put(metric.getKey(), (BigDecimal) metric.getValue());
-    }
+    Map<String, BigDecimal> expectedValues = addRowsSumPrecision(998, 
mutableSegmentImpl);
+
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+    assertTrue(numDocsIndexed < NUM_ROWS);
+    assertEquals(numDocsIndexed, expectedValues.size());
 
-    // Assert that the aggregated values are correct
     GenericRow reuse = new GenericRow();
-    for (int docId = 0; docId < expected.size(); docId++) {
+    for (int docId = 0; docId < numDocsIndexed; docId++) {
       GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
-      String key = buildKey(row);
-
-      BigDecimal expectedBigDecimal = expected.get(key);
       BigDecimal actualBigDecimal = (BigDecimal) row.getValue(m1);
-
-      Assert.assertEquals(actualBigDecimal, expectedBigDecimal, "The 
aggregated SUM does not match the expected SUM");
+      BigDecimal expectedBigDecimal = expectedValues.get(buildKey(row));
+      assertEquals(actualBigDecimal, expectedBigDecimal);
     }
+
     mutableSegmentImpl.destroy();
   }
 
   @Test
   public void testBigDecimalTooBig() {
     String m1 = "sumPrecision1";
-    Schema schema = getSchemaBuilder().addMetric(m1, 
FieldSpec.DataType.BIG_DECIMAL).build();
+    Schema schema = getSchemaBuilder().addMetric(m1, 
DataType.BIG_DECIMAL).build();
 
     int seed = 1;
     Random random = new Random(seed);
 
     MutableSegmentImpl mutableSegmentImpl =
-        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Collections.singleton(m1), VAR_LENGTH_SET,
-            INVERTED_INDEX_SET, Collections.singletonList(new 
AggregationConfig(m1, "SUM_PRECISION(metric, 3)")));
+        MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, 
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+            List.of(new AggregationConfig(m1, "SUM_PRECISION(metric, 3)")));
 
     // Make a big decimal larger than 3 precision and try to index it
     BigDecimal large = BigDecimalUtils.generateMaximumNumberWithPrecision(5);
     GenericRow row = getRow(random, 1);
 
     row.putValue("metric", large);
-    Assert.assertThrows(IllegalArgumentException.class, () -> {
+    assertThrows(IllegalArgumentException.class, () -> {
       mutableSegmentImpl.index(row, METADATA);
     });
 
     mutableSegmentImpl.destroy();
   }
 
-  private BigDecimal generateRandomBigDecimal(Random random, int maxPrecision, 
int scale) {
+  private static BigDecimal generateRandomBigDecimal(Random random, int 
maxPrecision, int scale) {
     int precision = 1 + random.nextInt(maxPrecision);
-
-    String s = "";
+    StringBuilder stringBuilder = new StringBuilder();
     for (int i = 0; i < precision; i++) {
-      s = s + (1 + random.nextInt(9));
-    }
-
-    if ((1 + random.nextInt(2)) == 1) {
-      return (new BigDecimal(s).setScale(scale)).negate();
-    } else {
-      return new BigDecimal(s).setScale(scale);
+      stringBuilder.append(1 + random.nextInt(9));
     }
+    BigDecimal bigDecimal = new 
BigDecimal(stringBuilder.toString()).setScale(scale, RoundingMode.UNNECESSARY);
+    return random.nextBoolean() ? bigDecimal : bigDecimal.negate();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to