This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 642bf00501e Enhance ValueAggregator (#16552)
642bf00501e is described below
commit 642bf00501ef0cc0ddb79ade00b2eff695590ea0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 8 18:45:14 2025 -0600
Enhance ValueAggregator (#16552)
---
.../local/aggregator/AvgValueAggregator.java | 11 +-
.../local/aggregator/CountValueAggregator.java | 10 +-
.../DistinctCountBitmapValueAggregator.java | 7 +
.../DistinctCountCPCSketchValueAggregator.java | 26 +-
.../DistinctCountHLLPlusValueAggregator.java | 11 +-
.../DistinctCountHLLValueAggregator.java | 11 +-
.../DistinctCountThetaSketchValueAggregator.java | 7 +
.../DistinctCountULLValueAggregator.java | 11 +-
.../IntegerTupleSketchValueAggregator.java | 13 +-
.../local/aggregator/MaxValueAggregator.java | 11 +-
.../aggregator/MinMaxRangeValueAggregator.java | 11 +-
.../local/aggregator/MinValueAggregator.java | 11 +-
.../aggregator/PercentileEstValueAggregator.java | 7 +
.../PercentileTDigestValueAggregator.java | 7 +
.../aggregator/SumPrecisionValueAggregator.java | 11 +-
.../local/aggregator/SumValueAggregator.java | 11 +-
.../segment/local/aggregator/ValueAggregator.java | 11 +-
.../local/aggregator/ValueAggregatorFactory.java | 24 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 117 +++++---
.../startree/v2/builder/BaseSingleTreeBuilder.java | 17 +-
.../segment/local/utils/TableConfigUtils.java | 30 +-
.../DistinctCountCPCSketchValueAggregatorTest.java | 6 +
.../DistinctCountULLValueAggregatorTest.java | 6 +
.../IntegerTupleSketchValueAggregatorTest.java | 7 +
.../local/aggregator/ValueAggregatorTest.java | 62 ++++
...MutableSegmentImplIngestionAggregationTest.java | 322 +++++++--------------
26 files changed, 462 insertions(+), 316 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
index 7ba199642e4..b999d49ea7e 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/AvgValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -38,7 +39,10 @@ public class AvgValueAggregator implements
ValueAggregator<Object, AvgPair> {
}
@Override
- public AvgPair getInitialAggregatedValue(Object rawValue) {
+ public AvgPair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new AvgPair();
+ }
if (rawValue instanceof byte[]) {
return deserializeAggregatedValue((byte[]) rawValue);
} else {
@@ -67,6 +71,11 @@ public class AvgValueAggregator implements
ValueAggregator<Object, AvgPair> {
return new AvgPair(value.getSum(), value.getCount());
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES + Long.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
index 177578935b7..2b7e01a6575 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/CountValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -36,8 +37,8 @@ public class CountValueAggregator implements
ValueAggregator<Object, Long> {
}
@Override
- public Long getInitialAggregatedValue(Object rawValue) {
- return 1L;
+ public Long getInitialAggregatedValue(@Nullable Object rawValue) {
+ return rawValue != null ? 1L : 0L;
}
@Override
@@ -55,6 +56,11 @@ public class CountValueAggregator implements
ValueAggregator<Object, Long> {
return value;
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Long.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
index 5da5ae1963c..64cceab6bb5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountBitmapValueAggregator.java
@@ -41,6 +41,8 @@ public class DistinctCountBitmapValueAggregator implements
ValueAggregator<Objec
@Override
public RoaringBitmap getInitialAggregatedValue(Object rawValue) {
+ // NOTE: rawValue cannot be null because this aggregator can only be used
for star-tree index.
+ assert rawValue != null;
RoaringBitmap initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -77,6 +79,11 @@ public class DistinctCountBitmapValueAggregator implements
ValueAggregator<Objec
return value.clone();
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return false;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
index 6bc816f50ab..7ff2a636de7 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.aggregator;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -54,8 +55,11 @@ public class DistinctCountCPCSketchValueAggregator
implements ValueAggregator<Ob
}
@Override
- public Object getInitialAggregatedValue(Object rawValue) {
+ public Object getInitialAggregatedValue(@Nullable Object rawValue) {
CpcUnion cpcUnion = new CpcUnion(_lgK);
+ if (rawValue == null) {
+ return cpcUnion;
+ }
if (rawValue instanceof byte[]) { // Serialized Sketch
byte[] bytes = (byte[]) rawValue;
cpcUnion.update(deserializeAggregatedValue(bytes));
@@ -100,6 +104,11 @@ public class DistinctCountCPCSketchValueAggregator
implements ValueAggregator<Ob
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return CpcSketch.getMaxSerializedBytes(_lgK);
@@ -121,21 +130,6 @@ public class DistinctCountCPCSketchValueAggregator
implements ValueAggregator<Ob
return _lgK;
}
- private CpcSketch union(CpcSketch left, CpcSketch right) {
- if (left == null && right == null) {
- return empty();
- } else if (left == null) {
- return right;
- } else if (right == null) {
- return left;
- }
-
- CpcUnion union = new CpcUnion(_lgK);
- union.update(left);
- union.update(right);
- return union.getResult();
- }
-
private void addObjectToSketch(Object rawValue, CpcSketch sketch) {
if (rawValue instanceof String) {
sketch.update((String) rawValue);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
index a4857a1b8de..6c1a50e5c57 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java
@@ -22,6 +22,7 @@ import
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.local.utils.HyperLogLogPlusUtils;
@@ -64,7 +65,10 @@ public class DistinctCountHLLPlusValueAggregator implements
ValueAggregator<Obje
}
@Override
- public HyperLogLogPlus getInitialAggregatedValue(Object rawValue) {
+ public HyperLogLogPlus getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new HyperLogLogPlus(_p, _sp);
+ }
HyperLogLogPlus initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -107,6 +111,11 @@ public class DistinctCountHLLPlusValueAggregator
implements ValueAggregator<Obje
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
// NOTE: For aggregated metrics, initial aggregated value might have not
been generated. Returns the byte size
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
index 0ca535c9d8e..6414231f913 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
@@ -22,6 +22,7 @@ import
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.local.utils.HyperLogLogUtils;
@@ -58,7 +59,10 @@ public class DistinctCountHLLValueAggregator implements
ValueAggregator<Object,
}
@Override
- public HyperLogLog getInitialAggregatedValue(Object rawValue) {
+ public HyperLogLog getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new HyperLogLog(_log2m);
+ }
HyperLogLog initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -101,6 +105,11 @@ public class DistinctCountHLLValueAggregator implements
ValueAggregator<Object,
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
// NOTE: For aggregated metrics, initial aggregated value might have not
been generated. Returns the byte size
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
index c19bb0a0f17..5dba35d637c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregator.java
@@ -116,6 +116,8 @@ public class DistinctCountThetaSketchValueAggregator
implements ValueAggregator<
@Override
public Object getInitialAggregatedValue(Object rawValue) {
+ // NOTE: rawValue cannot be null because this aggregator can only be used
for star-tree index.
+ assert rawValue != null;
Union thetaUnion = _setOperationBuilder.buildUnion();
if (rawValue instanceof byte[]) { // Serialized Sketch
byte[] bytes = (byte[]) rawValue;
@@ -175,6 +177,11 @@ public class DistinctCountThetaSketchValueAggregator
implements ValueAggregator<
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return false;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
index d0447307151..cfc7a91cccf 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.aggregator;
import com.dynatrace.hash4j.distinctcount.UltraLogLog;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
@@ -54,7 +55,10 @@ public class DistinctCountULLValueAggregator implements
ValueAggregator<Object,
}
@Override
- public UltraLogLog getInitialAggregatedValue(Object rawValue) {
+ public UltraLogLog getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return UltraLogLog.create(_p);
+ }
UltraLogLog initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -83,6 +87,11 @@ public class DistinctCountULLValueAggregator implements
ValueAggregator<Object,
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return (1 << _p) + 1;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
index f5294db788d..e5db940dbcd 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.aggregator;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -60,9 +61,12 @@ public class IntegerTupleSketchValueAggregator implements
ValueAggregator<byte[]
}
@Override
- public Object getInitialAggregatedValue(byte[] rawValue) {
- Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
+ public Object getInitialAggregatedValue(@Nullable byte[] rawValue) {
Union tupleUnion = new Union<>(_nominalEntries, new
IntegerSummarySetOperations(_mode, _mode));
+ if (rawValue == null) {
+ return tupleUnion;
+ }
+ Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
tupleUnion.union(initialValue);
return tupleUnion;
}
@@ -114,6 +118,11 @@ public class IntegerTupleSketchValueAggregator implements
ValueAggregator<byte[]
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
/**
* Returns the maximum number of storage bytes required for a Compact
Integer Tuple Sketch with the given
* number of actual entries. Note that this assumes the worst case of the
sketch in
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
index f5b51833fae..d5fcce740a6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MaxValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -36,7 +37,10 @@ public class MaxValueAggregator implements
ValueAggregator<Object, Double> {
}
@Override
- public Double getInitialAggregatedValue(Object rawValue) {
+ public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return Double.NEGATIVE_INFINITY;
+ }
return ValueAggregatorUtils.toDouble(rawValue);
}
@@ -55,6 +59,11 @@ public class MaxValueAggregator implements
ValueAggregator<Object, Double> {
return value;
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
index d6aeb1195ea..2f47ba7d280 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -38,7 +39,10 @@ public class MinMaxRangeValueAggregator implements
ValueAggregator<Object, MinMa
}
@Override
- public MinMaxRangePair getInitialAggregatedValue(Object rawValue) {
+ public MinMaxRangePair getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return new MinMaxRangePair();
+ }
if (rawValue instanceof byte[]) {
return deserializeAggregatedValue((byte[]) rawValue);
} else {
@@ -68,6 +72,11 @@ public class MinMaxRangeValueAggregator implements
ValueAggregator<Object, MinMa
return new MinMaxRangePair(value.getMin(), value.getMax());
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES + Double.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
index 2ebc95c123c..4c4edf62d65 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -36,7 +37,10 @@ public class MinValueAggregator implements
ValueAggregator<Object, Double> {
}
@Override
- public Double getInitialAggregatedValue(Object rawValue) {
+ public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return Double.POSITIVE_INFINITY;
+ }
return ValueAggregatorUtils.toDouble(rawValue);
}
@@ -55,6 +59,11 @@ public class MinValueAggregator implements
ValueAggregator<Object, Double> {
return value;
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
index 4915461b87e..739b0a82b76 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileEstValueAggregator.java
@@ -44,6 +44,8 @@ public class PercentileEstValueAggregator implements
ValueAggregator<Object, Qua
@Override
public QuantileDigest getInitialAggregatedValue(Object rawValue) {
+ // NOTE: rawValue cannot be null because this aggregator can only be used
for star-tree index.
+ assert rawValue != null;
QuantileDigest initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -92,6 +94,11 @@ public class PercentileEstValueAggregator implements
ValueAggregator<Object, Qua
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return false;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
index 054b52f9bd7..0e1a2caf342 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/PercentileTDigestValueAggregator.java
@@ -54,6 +54,8 @@ public class PercentileTDigestValueAggregator implements
ValueAggregator<Object,
@Override
public TDigest getInitialAggregatedValue(Object rawValue) {
+ // NOTE: rawValue cannot be null because this aggregator can only be used
for star-tree index.
+ assert rawValue != null;
TDigest initialValue;
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
@@ -90,6 +92,11 @@ public class PercentileTDigestValueAggregator implements
ValueAggregator<Object,
return deserializeAggregatedValue(serializeAggregatedValue(value));
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return false;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
index f8074129838..1d97da7b263 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.aggregator;
import com.google.common.base.Preconditions;
import java.math.BigDecimal;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -58,7 +59,10 @@ public class SumPrecisionValueAggregator implements
ValueAggregator<Object, BigD
}
@Override
- public BigDecimal getInitialAggregatedValue(Object rawValue) {
+ public BigDecimal getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return BigDecimal.ZERO;
+ }
BigDecimal initialValue = toBigDecimal(rawValue);
if (_fixedSize < 0) {
_maxByteSize = Math.max(_maxByteSize,
BigDecimalUtils.byteSize(initialValue));
@@ -100,6 +104,11 @@ public class SumPrecisionValueAggregator implements
ValueAggregator<Object, BigD
return value;
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return _fixedSize > 0;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
Preconditions.checkState(_fixedSize > 0 || _maxByteSize > 0,
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
index 1f79b6cc6c8..524c08c3396 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -36,7 +37,10 @@ public class SumValueAggregator implements
ValueAggregator<Object, Double> {
}
@Override
- public Double getInitialAggregatedValue(Object rawValue) {
+ public Double getInitialAggregatedValue(@Nullable Object rawValue) {
+ if (rawValue == null) {
+ return 0.0;
+ }
return ValueAggregatorUtils.toDouble(rawValue);
}
@@ -55,6 +59,11 @@ public class SumValueAggregator implements
ValueAggregator<Object, Double> {
return value;
}
+ @Override
+ public boolean isAggregatedValueFixedSize() {
+ return true;
+ }
+
@Override
public int getMaxAggregatedValueByteSize() {
return Double.BYTES;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
index 7743fc7c314..20b8c3971cd 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -42,8 +43,10 @@ public interface ValueAggregator<R, A> {
/**
* Returns the initial aggregated value.
+ * <p>NOTE: rawValue can be null when the aggregator is used for ingestion
aggregation, and the column is not
+ * specified in the schema.
*/
- A getInitialAggregatedValue(R rawValue);
+ A getInitialAggregatedValue(@Nullable R rawValue);
/**
* Applies a raw value to the current aggregated value.
@@ -62,6 +65,12 @@ public interface ValueAggregator<R, A> {
*/
A cloneAggregatedValue(A value);
+ /**
+ * Returns whether the aggregated value is of fixed size. Value aggregator
can be used for ingestion aggregation only
+ * when the aggregated value is of fixed size.
+ */
+ boolean isAggregatedValueFixedSize();
+
/**
* Returns the maximum size in bytes of the aggregated values seen so far.
*/
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 2ba883854f9..4ada0134954 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -61,18 +61,15 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTRAWHLL:
return new DistinctCountHLLValueAggregator(arguments);
- case PERCENTILEEST:
- case PERCENTILERAWEST:
- return new PercentileEstValueAggregator();
- case PERCENTILETDIGEST:
- case PERCENTILERAWTDIGEST:
- return new PercentileTDigestValueAggregator(arguments);
- case DISTINCTCOUNTTHETASKETCH:
- case DISTINCTCOUNTRAWTHETASKETCH:
- return new DistinctCountThetaSketchValueAggregator(arguments);
case DISTINCTCOUNTHLLPLUS:
case DISTINCTCOUNTRAWHLLPLUS:
return new DistinctCountHLLPlusValueAggregator(arguments);
+ case DISTINCTCOUNTULL:
+ case DISTINCTCOUNTRAWULL:
+ return new DistinctCountULLValueAggregator(arguments);
+ case DISTINCTCOUNTTHETASKETCH:
+ case DISTINCTCOUNTRAWTHETASKETCH:
+ return new DistinctCountThetaSketchValueAggregator(arguments);
case DISTINCTCOUNTTUPLESKETCH:
case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
case AVGVALUEINTEGERSUMTUPLESKETCH:
@@ -81,9 +78,12 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTCPCSKETCH:
case DISTINCTCOUNTRAWCPCSKETCH:
return new DistinctCountCPCSketchValueAggregator(arguments);
- case DISTINCTCOUNTULL:
- case DISTINCTCOUNTRAWULL:
- return new DistinctCountULLValueAggregator(arguments);
+ case PERCENTILEEST:
+ case PERCENTILERAWEST:
+ return new PercentileEstValueAggregator();
+ case PERCENTILETDIGEST:
+ case PERCENTILERAWTDIGEST:
+ return new PercentileTDigestValueAggregator(arguments);
default:
throw new IllegalStateException("Unsupported aggregation type: " +
aggregationType);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index a3a6b31fdd1..9c4ac15a873 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.indexsegment.mutable;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.booleans.BooleanList;
@@ -75,7 +76,6 @@ import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
import org.apache.pinot.segment.local.utils.IdMap;
import org.apache.pinot.segment.local.utils.IngestionUtils;
-import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -96,6 +96,7 @@ import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap
import org.apache.pinot.segment.spi.index.mutable.provider.MutableIndexContext;
import org.apache.pinot.segment.spi.index.reader.MultiColumnTextIndexReader;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
@@ -480,7 +481,7 @@ public class MutableSegmentImpl implements MutableSegment {
private static Map<String, Pair<String, ValueAggregator>>
getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
if (segmentConfig.aggregateMetrics()) {
return fromAggregateMetrics(segmentConfig);
- } else if
(!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
+ } else if
(CollectionUtils.isNotEmpty(segmentConfig.getIngestionAggregationConfigs())) {
return fromAggregationConfig(segmentConfig);
} else {
return Collections.emptyMap();
@@ -491,8 +492,10 @@ public class MutableSegmentImpl implements MutableSegment {
Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()),
"aggregateMetrics cannot be enabled if AggregationConfig is set");
- Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new
HashMap<>();
- for (String metricName : segmentConfig.getSchema().getMetricNames()) {
+ List<String> metricNames = segmentConfig.getSchema().getMetricNames();
+ Map<String, Pair<String, ValueAggregator>> columnNameToAggregator =
+ Maps.newHashMapWithExpectedSize(metricNames.size());
+ for (String metricName : metricNames) {
columnNameToAggregator.put(metricName, Pair.of(metricName,
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM,
Collections.emptyList())));
}
@@ -500,11 +503,11 @@ public class MutableSegmentImpl implements MutableSegment
{
}
private static Map<String, Pair<String, ValueAggregator>>
fromAggregationConfig(RealtimeSegmentConfig segmentConfig) {
- Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new
HashMap<>();
-
- Preconditions.checkState(!segmentConfig.aggregateMetrics(),
- "aggregateMetrics cannot be enabled if AggregationConfig is set");
- for (AggregationConfig config :
segmentConfig.getIngestionAggregationConfigs()) {
+ List<AggregationConfig> aggregationConfigs =
segmentConfig.getIngestionAggregationConfigs();
+ assert !segmentConfig.aggregateMetrics() &&
CollectionUtils.isNotEmpty(aggregationConfigs);
+ Map<String, Pair<String, ValueAggregator>> columnNameToAggregator =
+ Maps.newHashMapWithExpectedSize(aggregationConfigs.size());
+ for (AggregationConfig config : aggregationConfigs) {
ExpressionContext expressionContext =
RequestContextUtils.getExpression(config.getAggregationFunction());
// validation is also done when the table is created, this is just a
sanity check.
Preconditions.checkState(expressionContext.getType() ==
ExpressionContext.Type.FUNCTION,
@@ -512,14 +515,16 @@ public class MutableSegmentImpl implements MutableSegment
{
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
- TableConfigUtils.validateIngestionAggregation(functionType);
- ExpressionContext argument = functionContext.getArguments().get(0);
+ List<ExpressionContext> arguments = functionContext.getArguments();
+ ExpressionContext argument = arguments.get(0);
Preconditions.checkState(argument.getType() ==
ExpressionContext.Type.IDENTIFIER,
"aggregator function argument must be a identifier: %s", config);
+ ValueAggregator valueAggregator =
+ ValueAggregatorFactory.getValueAggregator(functionType,
arguments.subList(1, arguments.size()));
+ Preconditions.checkState(valueAggregator.isAggregatedValueFixedSize(),
+ "aggregator function must have fixed size aggregated value: %s",
config);
- columnNameToAggregator.put(config.getColumnName(),
Pair.of(argument.getIdentifier(),
- ValueAggregatorFactory.getValueAggregator(functionType,
- functionContext.getArguments().subList(1,
functionContext.getArguments().size()))));
+ columnNameToAggregator.put(config.getColumnName(),
Pair.of(argument.getIdentifier(), valueAggregator));
}
return columnNameToAggregator;
@@ -811,9 +816,17 @@ public class MutableSegmentImpl implements MutableSegment {
String column = entry.getKey();
IndexContainer indexContainer = entry.getValue();
- // aggregate metrics is enabled.
- if (indexContainer._valueAggregator != null) {
- Object value = row.getValue(indexContainer._sourceColumn);
+ // Handle ingestion aggregation
+ ValueAggregator valueAggregator = indexContainer._valueAggregator;
+ if (valueAggregator != null) {
+ String sourceColumn = indexContainer._sourceColumn;
+ // NOTE: value can be null if the column is not specified in the
schema.
+ Object value = row.getValue(sourceColumn);
+ // Handle COUNT(*)
+ if (value == null &&
sourceColumn.equals(AggregationFunctionColumnPair.STAR)) {
+ assert valueAggregator.getAggregationType() ==
AggregationFunctionType.COUNT;
+ value = 1;
+ }
// Update numValues info
indexContainer._valuesInfo.updateSVNumValues();
@@ -822,7 +835,7 @@ public class MutableSegmentImpl implements MutableSegment {
FieldSpec fieldSpec = indexContainer._fieldSpec;
DataType dataType = fieldSpec.getDataType();
- value =
indexContainer._valueAggregator.getInitialAggregatedValue(value);
+ value = valueAggregator.getInitialAggregatedValue(value);
// BIG_DECIMAL is actually stored as byte[] and hence can be supported
here.
switch (dataType.getStoredType()) {
case INT:
@@ -839,7 +852,7 @@ public class MutableSegmentImpl implements MutableSegment {
break;
case BIG_DECIMAL:
case BYTES:
-
forwardIndex.add(indexContainer._valueAggregator.serializeAggregatedValue(value),
-1, docId);
+ forwardIndex.add(valueAggregator.serializeAggregatedValue(value),
-1, docId);
break;
default:
throw new UnsupportedOperationException(
@@ -926,7 +939,7 @@ public class MutableSegmentImpl implements MutableSegment {
if (_multiColumnValues != null) {
int pos = _multiColumnPos.getInt(column);
if (pos > -1) {
- _multiColumnValues.set(pos, (String) value);
+ _multiColumnValues.set(pos, value);
}
}
} else {
@@ -1011,37 +1024,47 @@ public class MutableSegmentImpl implements
MutableSegment {
private void aggregateMetrics(GenericRow row, int docId) {
for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
IndexContainer indexContainer =
_indexContainerMap.get(metricFieldSpec.getName());
- Object value = row.getValue(indexContainer._sourceColumn);
+ ValueAggregator valueAggregator = indexContainer._valueAggregator;
+ String sourceColumn = indexContainer._sourceColumn;
+ // NOTE: value can be null if the column is not specified in the schema.
+ Object value = row.getValue(sourceColumn);
+ // Skip aggregation if the input value is null.
+ if (value == null) {
+ // Handle COUNT(*)
+ if (sourceColumn.equals(AggregationFunctionColumnPair.STAR)) {
+ assert valueAggregator.getAggregationType() ==
AggregationFunctionType.COUNT;
+ value = 1;
+ } else {
+ continue;
+ }
+ }
MutableForwardIndex forwardIndex =
(MutableForwardIndex)
indexContainer._mutableIndexes.get(StandardIndexes.forward());
DataType dataType = metricFieldSpec.getDataType();
- Double oldDoubleValue;
- Double newDoubleValue;
- Long oldLongValue;
- Long newLongValue;
- ValueAggregator valueAggregator = indexContainer._valueAggregator;
switch (valueAggregator.getAggregatedValueType()) {
case DOUBLE:
+ double oldDoubleValue;
+ double newDoubleValue;
switch (dataType) {
case INT:
- oldDoubleValue = ((Integer)
forwardIndex.getInt(docId)).doubleValue();
- newDoubleValue = (Double)
valueAggregator.applyRawValue(oldDoubleValue, value);
- forwardIndex.setInt(docId, newDoubleValue.intValue());
+ oldDoubleValue = forwardIndex.getInt(docId);
+ newDoubleValue = (double)
valueAggregator.applyRawValue(oldDoubleValue, value);
+ forwardIndex.setInt(docId, (int) newDoubleValue);
break;
case LONG:
- oldDoubleValue = ((Long)
forwardIndex.getLong(docId)).doubleValue();
- newDoubleValue = (Double)
valueAggregator.applyRawValue(oldDoubleValue, value);
- forwardIndex.setLong(docId, newDoubleValue.longValue());
+ oldDoubleValue = forwardIndex.getLong(docId);
+ newDoubleValue = (double)
valueAggregator.applyRawValue(oldDoubleValue, value);
+ forwardIndex.setLong(docId, (long) newDoubleValue);
break;
case FLOAT:
- oldDoubleValue = ((Float)
forwardIndex.getFloat(docId)).doubleValue();
- newDoubleValue = (Double)
valueAggregator.applyRawValue(oldDoubleValue, value);
- forwardIndex.setFloat(docId, newDoubleValue.floatValue());
+ oldDoubleValue = forwardIndex.getFloat(docId);
+ newDoubleValue = (double)
valueAggregator.applyRawValue(oldDoubleValue, value);
+ forwardIndex.setFloat(docId, (float) newDoubleValue);
break;
case DOUBLE:
oldDoubleValue = forwardIndex.getDouble(docId);
- newDoubleValue = (Double)
valueAggregator.applyRawValue(oldDoubleValue, value);
+ newDoubleValue = (double)
valueAggregator.applyRawValue(oldDoubleValue, value);
forwardIndex.setDouble(docId, newDoubleValue);
break;
default:
@@ -1050,26 +1073,28 @@ public class MutableSegmentImpl implements
MutableSegment {
}
break;
case LONG:
+ long oldLongValue;
+ long newLongValue;
switch (dataType) {
case INT:
- oldLongValue = ((Integer)
forwardIndex.getInt(docId)).longValue();
- newLongValue = (Long)
valueAggregator.applyRawValue(oldLongValue, value);
- forwardIndex.setInt(docId, newLongValue.intValue());
+ oldLongValue = forwardIndex.getInt(docId);
+ newLongValue = (long)
valueAggregator.applyRawValue(oldLongValue, value);
+ forwardIndex.setInt(docId, (int) newLongValue);
break;
case LONG:
oldLongValue = forwardIndex.getLong(docId);
- newLongValue = (Long)
valueAggregator.applyRawValue(oldLongValue, value);
+ newLongValue = (long)
valueAggregator.applyRawValue(oldLongValue, value);
forwardIndex.setLong(docId, newLongValue);
break;
case FLOAT:
- oldLongValue = ((Float)
forwardIndex.getFloat(docId)).longValue();
- newLongValue = (Long)
valueAggregator.applyRawValue(oldLongValue, value);
- forwardIndex.setFloat(docId, newLongValue.floatValue());
+ oldLongValue = (long) forwardIndex.getFloat(docId);
+ newLongValue = (long)
valueAggregator.applyRawValue(oldLongValue, value);
+ forwardIndex.setFloat(docId, (float) newLongValue);
break;
case DOUBLE:
- oldLongValue = ((Double)
forwardIndex.getDouble(docId)).longValue();
- newLongValue = (Long)
valueAggregator.applyRawValue(oldLongValue, value);
- forwardIndex.setDouble(docId, newLongValue.doubleValue());
+ oldLongValue = (long) forwardIndex.getDouble(docId);
+ newLongValue = (long)
valueAggregator.applyRawValue(oldLongValue, value);
+ forwardIndex.setDouble(docId, (double) newLongValue);
break;
default:
throw new
UnsupportedOperationException(String.format("Aggregation type %s of %s not
supported for %s",
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
index f797c077a86..517bcd4a272 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java
@@ -263,13 +263,24 @@ abstract class BaseSingleTreeBuilder implements
SingleTreeBuilder {
int[] dimensions = Arrays.copyOf(segmentRecord._dimensions,
_numDimensions);
Object[] metrics = new Object[_numMetrics];
for (int i = 0; i < _numMetrics; i++) {
- metrics[i] =
_valueAggregators[i].getInitialAggregatedValue(segmentRecord._metrics[i]);
+ Object rawValue = segmentRecord._metrics[i];
+ if (rawValue != null) {
+ metrics[i] =
_valueAggregators[i].getInitialAggregatedValue(rawValue);
+ } else {
+ assert _valueAggregators[i].getAggregationType() ==
AggregationFunctionType.COUNT;
+ metrics[i] = 1L;
+ }
}
return new Record(dimensions, metrics);
} else {
for (int i = 0; i < _numMetrics; i++) {
- aggregatedRecord._metrics[i] =
- _valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i],
segmentRecord._metrics[i]);
+ Object rawValue = segmentRecord._metrics[i];
+ if (rawValue != null) {
+ aggregatedRecord._metrics[i] =
_valueAggregators[i].applyRawValue(aggregatedRecord._metrics[i], rawValue);
+ } else {
+ assert _valueAggregators[i].getAggregationType() ==
AggregationFunctionType.COUNT;
+ aggregatedRecord._metrics[i] = ((long) aggregatedRecord._metrics[i])
+ 1;
+ }
}
return aggregatedRecord;
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index cd4833dd044..c1fdb7c690b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -44,6 +43,8 @@ import
org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import
org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
@@ -105,7 +106,9 @@ import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.segment.spi.AggregationFunctionType.*;
+import static
org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLL;
+import static
org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLLPLUS;
+import static
org.apache.pinot.segment.spi.AggregationFunctionType.SUMPRECISION;
/**
@@ -124,8 +127,6 @@ public final class TableConfigUtils {
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use
KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module
as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
- private static final EnumSet<AggregationFunctionType>
SUPPORTED_INGESTION_AGGREGATIONS =
- EnumSet.of(SUM, MIN, MAX, COUNT, DISTINCTCOUNTHLL, SUMPRECISION,
DISTINCTCOUNTHLLPLUS);
private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
@@ -418,7 +419,7 @@ public final class TableConfigUtils {
// Aggregation configs
List<AggregationConfig> aggregationConfigs =
ingestionConfig.getAggregationConfigs();
Set<String> aggregationSourceColumns = new HashSet<>();
- if (!CollectionUtils.isEmpty(aggregationConfigs)) {
+ if (CollectionUtils.isNotEmpty(aggregationConfigs)) {
Preconditions.checkState(!tableConfig.getIndexingConfig().isAggregateMetrics(),
"aggregateMetrics cannot be set with AggregationConfig");
Set<String> aggregationColumns = new HashSet<>();
@@ -453,8 +454,6 @@ public final class TableConfigUtils {
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
- validateIngestionAggregation(functionType);
-
List<ExpressionContext> arguments = functionContext.getArguments();
int numArguments = arguments.size();
if (functionType == DISTINCTCOUNTHLL) {
@@ -510,6 +509,18 @@ public final class TableConfigUtils {
Preconditions.checkState(firstArgument.getType() ==
ExpressionContext.Type.IDENTIFIER,
"First argument of aggregation function: %s must be identifier,
got: %s", functionType,
firstArgument.getType());
+ // Create a ValueAggregator for the aggregation function and check
if it is supported for ingestion (fixed
+ // size aggregated value).
+ ValueAggregator<?, ?> valueAggregator;
+ try {
+ valueAggregator =
+ ValueAggregatorFactory.getValueAggregator(functionType,
arguments.subList(1, numArguments));
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Caught exception while creating ValueAggregator for
aggregation function: " + aggregationFunction, e);
+ }
+
Preconditions.checkState(valueAggregator.isAggregatedValueFixedSize(),
+ "Aggregation function: %s must have fixed size aggregated
value", aggregationFunction);
aggregationSourceColumns.add(firstArgument.getIdentifier());
}
@@ -601,11 +612,6 @@ public final class TableConfigUtils {
}
}
- public static void validateIngestionAggregation(AggregationFunctionType
functionType) {
-
Preconditions.checkState(SUPPORTED_INGESTION_AGGREGATIONS.contains(functionType),
- "Aggregation function: %s must be one of: %s", functionType,
SUPPORTED_INGESTION_AGGREGATIONS);
- }
-
private static void validateStreamConfigMaps(TableConfig tableConfig) {
List<Map<String, String>> streamConfigMaps =
IngestionConfigUtils.getStreamConfigMaps(tableConfig);
int numStreamConfigs = streamConfigMaps.size();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
index c9bc80f8264..25e6588b412 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountCPCSketchValueAggregatorTest.java
@@ -59,6 +59,12 @@ public class DistinctCountCPCSketchValueAggregatorTest {
assertEquals(Math.round(toSketch(agg.getInitialAggregatedValue(bytes)).getEstimate()),
2);
}
+ @Test
+ public void nullInitialShouldReturnEmptySketch() {
+ DistinctCountCPCSketchValueAggregator agg = new
DistinctCountCPCSketchValueAggregator(Collections.emptyList());
+ assertEquals(toSketch(agg.getInitialAggregatedValue(null)).getEstimate(),
0.0);
+ }
+
@Test
public void applyAggregatedValueShouldUnion() {
CpcSketch input1 = new CpcSketch();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
index f7b5fc14dc9..78ba4dacbe3 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountULLValueAggregatorTest.java
@@ -80,6 +80,12 @@ public class DistinctCountULLValueAggregatorTest {
assertEquals(aggregated.getP(), 12);
}
+ @Test
+ public void nullInitialShouldReturnEmptyULL() {
+ DistinctCountULLValueAggregator agg = new
DistinctCountULLValueAggregator(Collections.emptyList());
+
assertEquals(agg.getInitialAggregatedValue(null).getDistinctCountEstimate(),
0.0);
+ }
+
@Test
public void applyAggregatedValueShouldUnion() {
UltraLogLog input1 = UltraLogLog.create(12);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
index bc95c382c87..c6dbeb6d7f4 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
@@ -43,6 +43,13 @@ public class IntegerTupleSketchValueAggregatorTest {
assertEquals(toSketch(agg.getInitialAggregatedValue(sketchContaining("hello
world", 1))).getEstimate(), 1.0);
}
+ @Test
+ public void nullInitialShouldReturnEmptySketch() {
+ IntegerTupleSketchValueAggregator agg =
+ new IntegerTupleSketchValueAggregator(Collections.emptyList(),
IntegerSummary.Mode.Sum);
+ assertEquals(toSketch(agg.getInitialAggregatedValue(null)).getEstimate(),
0.0);
+ }
+
@Test
public void applyAggregatedValueShouldUnion() {
IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
new file mode 100644
index 00000000000..d983ce259c7
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import java.util.List;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ValueAggregatorTest {
+
+ @Test(dataProvider = "fixedSizeAggregatedValue")
+ public void testFixedSizeAggregatedValue(AggregationFunctionType
functionType, List<ExpressionContext> arguments,
+ boolean expected) {
+ assertEquals(ValueAggregatorFactory.getValueAggregator(functionType,
arguments).isAggregatedValueFixedSize(),
+ expected);
+ }
+
+ @DataProvider
+ public static Object[][] fixedSizeAggregatedValue() {
+ return new Object[][]{
+ {AggregationFunctionType.COUNT, List.of(), true},
+ {AggregationFunctionType.MIN, List.of(), true},
+ {AggregationFunctionType.MAX, List.of(), true},
+ {AggregationFunctionType.SUM, List.of(), true},
+ {AggregationFunctionType.SUMPRECISION, List.of(), false},
+ {AggregationFunctionType.SUMPRECISION,
List.of(ExpressionContext.forLiteral(Literal.intValue(20))), true},
+ {AggregationFunctionType.AVG, List.of(), true},
+ {AggregationFunctionType.MINMAXRANGE, List.of(), true},
+ {AggregationFunctionType.DISTINCTCOUNTBITMAP, List.of(), false},
+ {AggregationFunctionType.DISTINCTCOUNTHLL, List.of(), true},
+ {AggregationFunctionType.DISTINCTCOUNTHLLPLUS, List.of(), true},
+ {AggregationFunctionType.DISTINCTCOUNTULL, List.of(), true},
+ {AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, List.of(), false},
+ {AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH, List.of(), true},
+ {AggregationFunctionType.DISTINCTCOUNTCPCSKETCH, List.of(), true},
+ {AggregationFunctionType.PERCENTILEEST, List.of(), false},
+ {AggregationFunctionType.PERCENTILETDIGEST, List.of(), false}
+ };
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
index eb8fbc30d14..eab8784a9e0 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
@@ -18,31 +18,29 @@
*/
package org.apache.pinot.segment.local.indexsegment.mutable;
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import org.apache.pinot.common.request.Literal;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import
org.apache.pinot.segment.local.aggregator.DistinctCountHLLValueAggregator;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.utils.BigDecimalUtils;
-import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
public class MutableSegmentImplIngestionAggregationTest {
@@ -61,18 +59,17 @@ public class MutableSegmentImplIngestionAggregationTest {
private static Schema.SchemaBuilder getSchemaBuilder() {
return new Schema.SchemaBuilder().setSchemaName("testSchema")
- .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
- .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING)
- .addDateTime(TIME_COLUMN1, FieldSpec.DataType.INT, "1:DAYS:EPOCH",
"1:DAYS")
- .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH",
"1:HOURS");
+ .addSingleValueDimension(DIMENSION_1, DataType.INT)
+ .addSingleValueDimension(DIMENSION_2, DataType.STRING)
+ .addDateTime(TIME_COLUMN1, DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+ .addDateTime(TIME_COLUMN2, DataType.INT, "1:HOURS:EPOCH", "1:HOURS");
}
- private static final Set<String> VAR_LENGTH_SET =
Collections.singleton(DIMENSION_2);
- private static final Set<String> INVERTED_INDEX_SET =
- new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1,
TIME_COLUMN2));
+ private static final Set<String> VAR_LENGTH_SET = Set.of(DIMENSION_2);
+ private static final Set<String> INVERTED_INDEX_SET = Set.of(DIMENSION_1,
DIMENSION_2, TIME_COLUMN1, TIME_COLUMN2);
private static final List<String> STRING_VALUES =
- Collections.unmodifiableList(Arrays.asList("aa", "bbb", "cc", "ddd",
"ee", "fff", "gg", "hhh", "ii", "jjj"));
+ List.of("aa", "bbb", "cc", "ddd", "ee", "fff", "gg", "hhh", "ii", "jjj");
@Test
public void testSameSrcDifferentAggregations()
@@ -80,12 +77,10 @@ public class MutableSegmentImplIngestionAggregationTest {
String m1 = "metric_MAX";
String m2 = "metric_MIN";
- Schema schema =
- getSchemaBuilder().addMetric(m2,
FieldSpec.DataType.DOUBLE).addMetric(m1, FieldSpec.DataType.DOUBLE).build();
+ Schema schema = getSchemaBuilder().addMetric(m2,
DataType.DOUBLE).addMetric(m1, DataType.DOUBLE).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new
HashSet<>(Arrays.asList(m2, m1)),
- VAR_LENGTH_SET, INVERTED_INDEX_SET,
- Arrays.asList(new AggregationConfig(m1, "MAX(metric)"), new
AggregationConfig(m2, "MIN(metric)")));
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "MAX(metric)"), new
AggregationConfig(m2, "MIN(metric)")));
Map<String, Double> expectedMin = new HashMap<>();
Map<String, Double> expectedMax = new HashMap<>();
@@ -98,12 +93,13 @@ public class MutableSegmentImplIngestionAggregationTest {
(Integer) metrics.get(0).getValue()));
}
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
GenericRow reuse = new GenericRow();
- for (int docId = 0; docId < expectedMax.size(); docId++) {
+ for (int docId = 0; docId < numDocsIndexed; docId++) {
GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
String key = buildKey(row);
- Assert.assertEquals(row.getValue(m2), expectedMin.get(key), key);
- Assert.assertEquals(row.getValue(m1), expectedMax.get(key), key);
+ assertEquals(row.getValue(m2), expectedMin.get(key), key);
+ assertEquals(row.getValue(m1), expectedMax.get(key), key);
}
mutableSegmentImpl.destroy();
@@ -115,12 +111,10 @@ public class MutableSegmentImplIngestionAggregationTest {
String m1 = "sum1";
String m2 = "sum2";
- Schema schema =
- getSchemaBuilder().addMetric(m1, FieldSpec.DataType.INT).addMetric(m2,
FieldSpec.DataType.LONG).build();
+ Schema schema = getSchemaBuilder().addMetric(m1,
DataType.INT).addMetric(m2, DataType.LONG).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new
HashSet<>(Arrays.asList(m2, m1)),
- VAR_LENGTH_SET, INVERTED_INDEX_SET,
- Arrays.asList(new AggregationConfig(m1, "SUM(metric)"), new
AggregationConfig(m2, "SUM(metric_2)")));
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "SUM(metric)"), new
AggregationConfig(m2, "SUM(metric_2)")));
Map<String, Integer> expectedSum1 = new HashMap<>();
Map<String, Long> expectedSum2 = new HashMap<>();
@@ -131,38 +125,44 @@ public class MutableSegmentImplIngestionAggregationTest {
expectedSum2.getOrDefault(metrics.get(1).getKey(), 0L) + ((Integer)
metrics.get(1).getValue()).longValue());
}
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
GenericRow reuse = new GenericRow();
- for (int docId = 0; docId < expectedSum1.size(); docId++) {
+ for (int docId = 0; docId < numDocsIndexed; docId++) {
GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
String key = buildKey(row);
- Assert.assertEquals(row.getValue(m1), expectedSum1.get(key), key);
- Assert.assertEquals(row.getValue(m2), expectedSum2.get(key), key);
+ assertEquals(row.getValue(m1), expectedSum1.get(key), key);
+ assertEquals(row.getValue(m2), expectedSum2.get(key), key);
}
mutableSegmentImpl.destroy();
}
@Test
- public void testValuesAreNullThrowsException()
+ public void testNullValues()
throws Exception {
String m1 = "sum1";
- Schema schema = getSchemaBuilder().addMetric(m1,
FieldSpec.DataType.INT).build();
+ Schema schema = getSchemaBuilder().addMetric(m1, DataType.INT).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Collections.singleton(m1), VAR_LENGTH_SET,
- INVERTED_INDEX_SET, Collections.singletonList(new
AggregationConfig(m1, "SUM(metric)")));
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "SUM(metric)")));
long seed = 2;
Random random = new Random(seed);
- // Generate random int to prevent overflow
- GenericRow row = getRow(random, 1);
- row.putValue(METRIC, null);
- try {
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow row = getRow(random, 1);
mutableSegmentImpl.index(row, METADATA);
- Assert.fail();
- } catch (NullPointerException e) {
- // expected
+ }
+
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+ assertTrue(numDocsIndexed < NUM_ROWS);
+
+ GenericRow reuse = new GenericRow();
+ for (int i = 0; i < numDocsIndexed; i++) {
+ GenericRow row = mutableSegmentImpl.getRecord(i, reuse);
+ String key = buildKey(row);
+ assertEquals(row.getValue(m1), 0, key);
}
mutableSegmentImpl.destroy();
@@ -173,58 +173,25 @@ public class MutableSegmentImplIngestionAggregationTest {
throws Exception {
String m1 = "hll1";
- Schema schema = getSchemaBuilder().addMetric(m1,
FieldSpec.DataType.BYTES).build();
+ Schema schema = getSchemaBuilder().addMetric(m1, DataType.BYTES).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Collections.singleton(m1), VAR_LENGTH_SET,
- INVERTED_INDEX_SET, Collections.singletonList(new
AggregationConfig(m1, "distinctCountHLL(metric, 12)")));
-
- Map<String, HLLTestData> expected = new HashMap<>();
- List<Metric> metrics = addRowsDistinctCountHLL(998, mutableSegmentImpl);
- for (Metric metric : metrics) {
- expected.put(metric.getKey(), (HLLTestData) metric.getValue());
- }
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "distinctCountHLL(metric,
12)")));
- List<ExpressionContext> arguments =
List.of(ExpressionContext.forLiteral(Literal.stringValue("12")));
- DistinctCountHLLValueAggregator valueAggregator = new
DistinctCountHLLValueAggregator(arguments);
+ Map<String, HyperLogLog> expectedValues = addRowsDistinctCountHLL(998,
mutableSegmentImpl);
- Set<Integer> integers = new HashSet<>();
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+ assertTrue(numDocsIndexed < NUM_ROWS);
+ assertEquals(numDocsIndexed, expectedValues.size());
- // Assert that the distinct count is within an error margin. We assert on
the cardinality of the HLL in the docID
- // and the HLL we made, but also on the cardinality of the HLL in the
docID and the actual cardinality from the set
- // of integers.
GenericRow reuse = new GenericRow();
- for (int docId = 0; docId < expected.size(); docId++) {
+ for (int docId = 0; docId < numDocsIndexed; docId++) {
GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
- String key = buildKey(row);
-
- integers.addAll(expected.get(key)._integers);
-
- HyperLogLog expectedHLL = expected.get(key)._hll;
- HyperLogLog actualHLL =
valueAggregator.deserializeAggregatedValue((byte[]) row.getValue(m1));
-
- Assert.assertEquals(actualHLL.cardinality(), expectedHLL.cardinality(),
(int) (expectedHLL.cardinality() * 0.04),
- "The HLL cardinality from the index is within a tolerable error
margin (4%) of the cardinality of the "
- + "expected HLL.");
- Assert.assertEquals(actualHLL.cardinality(),
expected.get(key)._integers.size(),
- expected.get(key)._integers.size() * 0.04,
- "The HLL cardinality from the index is within a tolerable error
margin (4%) of the actual cardinality of "
- + "the integers.");
+ HyperLogLog actualHLL =
CustomSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) row.getValue(m1));
+ HyperLogLog expectedHLL = expectedValues.get(buildKey(row));
+ assertEquals(actualHLL.cardinality(), expectedHLL.cardinality());
}
- // Assert that the aggregated HyperLogLog is also within the error margin
- HyperLogLog togetherHLL = new HyperLogLog(12);
- expected.forEach((key, value) -> {
- try {
- togetherHLL.addAll(value._hll);
- } catch (CardinalityMergeException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- });
-
- Assert.assertEquals(togetherHLL.cardinality(), integers.size(), (int)
(integers.size() * 0.04),
- "The aggregated HLL cardinality is within a tolerable error margin
(4%) of the actual cardinality of the "
- + "integers.");
mutableSegmentImpl.destroy();
}
@@ -234,24 +201,23 @@ public class MutableSegmentImplIngestionAggregationTest {
String m1 = "count1";
String m2 = "count2";
- Schema schema =
- getSchemaBuilder().addMetric(m1,
FieldSpec.DataType.LONG).addMetric(m2, FieldSpec.DataType.LONG).build();
+ Schema schema = getSchemaBuilder().addMetric(m1,
DataType.LONG).addMetric(m2, DataType.LONG).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, new
HashSet<>(Arrays.asList(m1, m2)),
- VAR_LENGTH_SET, INVERTED_INDEX_SET,
- Arrays.asList(new AggregationConfig(m1, "COUNT(metric)"), new
AggregationConfig(m2, "COUNT(*)")));
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1, m2), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "COUNT(metric)"), new
AggregationConfig(m2, "COUNT(*)")));
Map<String, Long> expectedCount = new HashMap<>();
for (List<Metric> metrics : addRows(3, mutableSegmentImpl)) {
expectedCount.put(metrics.get(0).getKey(),
expectedCount.getOrDefault(metrics.get(0).getKey(), 0L) + 1L);
}
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
GenericRow reuse = new GenericRow();
- for (int docId = 0; docId < expectedCount.size(); docId++) {
+ for (int docId = 0; docId < numDocsIndexed; docId++) {
GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
String key = buildKey(row);
- Assert.assertEquals(row.getValue(m1), expectedCount.get(key), key);
- Assert.assertEquals(row.getValue(m2), expectedCount.get(key), key);
+ assertEquals(row.getValue(m1), expectedCount.get(key), key);
+ assertEquals(row.getValue(m2), expectedCount.get(key), key);
}
mutableSegmentImpl.destroy();
@@ -262,7 +228,7 @@ public class MutableSegmentImplIngestionAggregationTest {
TIME_COLUMN1) + KEY_SEPARATOR + row.getValue(TIME_COLUMN2);
}
- private GenericRow getRow(Random random, Integer multiplicationFactor) {
+ private GenericRow getRow(Random random, int multiplicationFactor) {
GenericRow row = new GenericRow();
row.putValue(DIMENSION_1, random.nextInt(2 * multiplicationFactor));
@@ -273,126 +239,66 @@ public class MutableSegmentImplIngestionAggregationTest {
return row;
}
- private class HLLTestData {
- private HyperLogLog _hll;
- private Set<Integer> _integers;
-
- public HLLTestData(HyperLogLog hll, Set<Integer> integers) {
- _hll = hll;
- _integers = integers;
- }
-
- public HyperLogLog getHll() {
- return _hll;
- }
-
- public Set<Integer> getIntegers() {
- return _integers;
- }
- }
-
- private class Metric {
- private final String _key;
- private final Object _value;
+ private static class Metric {
+ final String _key;
+ final Object _value;
Metric(String key, Object value) {
_key = key;
_value = value;
}
- public String getKey() {
+ String getKey() {
return _key;
}
- public Object getValue() {
+ Object getValue() {
return _value;
}
}
- private List<Metric> addRowsDistinctCountHLL(long seed, MutableSegmentImpl
mutableSegmentImpl)
+ private Map<String, HyperLogLog> addRowsDistinctCountHLL(long seed,
MutableSegmentImpl mutableSegmentImpl)
throws Exception {
- List<Metric> metrics = new ArrayList<>();
+ Map<String, HyperLogLog> valueMap = new HashMap<>();
Random random = new Random(seed);
-
- HashMap<String, HyperLogLog> hllMap = new HashMap<>();
- HashMap<String, Set<Integer>> distinctMap = new HashMap<>();
-
- Integer rows = 500000;
-
- for (int i = 0; i < (rows); i++) {
+ for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = getRow(random, 1);
String key = buildKey(row);
int metricValue = random.nextInt(5000000);
row.putValue(METRIC, metricValue);
-
- if (hllMap.containsKey(key)) {
- hllMap.get(key).offer(row.getValue(METRIC));
- distinctMap.get(key).add(metricValue);
- } else {
- HyperLogLog hll = new HyperLogLog(12);
- hll.offer(row.getValue(METRIC));
- hllMap.put(key, hll);
- distinctMap.put(key, new HashSet<>(metricValue));
- }
-
mutableSegmentImpl.index(row, METADATA);
- }
- distinctMap.forEach(
- (key, value) -> metrics.add(new Metric(key, new
HLLTestData(hllMap.get(key), distinctMap.get(key)))));
-
- int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
- Assert.assertEquals(numDocsIndexed, hllMap.keySet().size());
-
- // Assert that aggregation happened.
- Assert.assertTrue(numDocsIndexed < NUM_ROWS);
+ valueMap.computeIfAbsent(key, k -> new
HyperLogLog(12)).offer(metricValue);
+ }
- return metrics;
+ return valueMap;
}
- private List<Metric> addRowsSumPrecision(long seed, MutableSegmentImpl
mutableSegmentImpl)
+ private Map<String, BigDecimal> addRowsSumPrecision(long seed,
MutableSegmentImpl mutableSegmentImpl)
throws Exception {
- List<Metric> metrics = new ArrayList<>();
+ Map<String, BigDecimal> valueMap = new HashMap<>();
Random random = new Random(seed);
-
- HashMap<String, BigDecimal> bdMap = new HashMap<>();
- HashMap<String, ArrayList<BigDecimal>> bdIndividualMap = new HashMap<>();
-
- int numRows = 50000;
- for (int i = 0; i < numRows; i++) {
+ for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = getRow(random, 1);
String key = buildKey(row);
BigDecimal metricValue = generateRandomBigDecimal(random, 5, 6);
- row.putValue(METRIC, metricValue.toString());
-
- if (bdMap.containsKey(key)) {
- bdMap.put(key, bdMap.get(key).add(metricValue));
- bdIndividualMap.get(key).add(metricValue);
- } else {
- bdMap.put(key, metricValue);
- ArrayList<BigDecimal> bdList = new ArrayList<>();
- bdList.add(metricValue);
- bdIndividualMap.put(key, bdList);
- }
-
+ row.putValue(METRIC, metricValue);
mutableSegmentImpl.index(row, METADATA);
- }
- for (String key : bdMap.keySet()) {
- metrics.add(new Metric(key, bdMap.get(key)));
+ valueMap.compute(key, (k, v) -> {
+ if (v == null) {
+ return metricValue;
+ } else {
+ return v.add(metricValue);
+ }
+ });
}
- int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
- Assert.assertEquals(numDocsIndexed, bdMap.keySet().size());
-
- // Assert that aggregation happened.
- Assert.assertTrue(numDocsIndexed < NUM_ROWS);
-
- return metrics;
+ return valueMap;
}
private List<List<Metric>> addRows(long seed, MutableSegmentImpl
mutableSegmentImpl)
@@ -403,7 +309,6 @@ public class MutableSegmentImplIngestionAggregationTest {
Random random = new Random(seed);
for (int i = 0; i < NUM_ROWS; i++) {
- // Generate random int to prevent overflow
GenericRow row = getRow(random, 1);
Integer metricValue = random.nextInt(10000);
Integer metric2Value = random.nextInt();
@@ -413,15 +318,15 @@ public class MutableSegmentImplIngestionAggregationTest {
mutableSegmentImpl.index(row, METADATA);
String key = buildKey(row);
- metrics.add(Arrays.asList(new Metric(key, metricValue), new Metric(key,
metric2Value)));
+ metrics.add(List.of(new Metric(key, metricValue), new Metric(key,
metric2Value)));
keys.add(key);
}
int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
- Assert.assertEquals(numDocsIndexed, keys.size());
+ assertEquals(numDocsIndexed, keys.size());
// Assert that aggregation happened.
- Assert.assertTrue(numDocsIndexed < NUM_ROWS);
+ assertTrue(numDocsIndexed < NUM_ROWS);
return metrics;
}
@@ -430,70 +335,61 @@ public class MutableSegmentImplIngestionAggregationTest {
public void testSumPrecision()
throws Exception {
String m1 = "sumPrecision1";
- Schema schema = getSchemaBuilder().addMetric(m1,
FieldSpec.DataType.BIG_DECIMAL).build();
+ Schema schema = getSchemaBuilder().addMetric(m1,
DataType.BIG_DECIMAL).build();
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Collections.singleton(m1), VAR_LENGTH_SET,
- INVERTED_INDEX_SET,
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
// Setting precision to 38 in the arguments for SUM_PRECISION
- Collections.singletonList(new AggregationConfig(m1,
"SUM_PRECISION(metric, 38)")));
+ List.of(new AggregationConfig(m1, "SUM_PRECISION(metric, 38)")));
- Map<String, BigDecimal> expected = new HashMap<>();
- List<Metric> metrics = addRowsSumPrecision(998, mutableSegmentImpl);
- for (Metric metric : metrics) {
- expected.put(metric.getKey(), (BigDecimal) metric.getValue());
- }
+ Map<String, BigDecimal> expectedValues = addRowsSumPrecision(998,
mutableSegmentImpl);
+
+ int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
+ assertTrue(numDocsIndexed < NUM_ROWS);
+ assertEquals(numDocsIndexed, expectedValues.size());
- // Assert that the aggregated values are correct
GenericRow reuse = new GenericRow();
- for (int docId = 0; docId < expected.size(); docId++) {
+ for (int docId = 0; docId < numDocsIndexed; docId++) {
GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
- String key = buildKey(row);
-
- BigDecimal expectedBigDecimal = expected.get(key);
BigDecimal actualBigDecimal = (BigDecimal) row.getValue(m1);
-
- Assert.assertEquals(actualBigDecimal, expectedBigDecimal, "The
aggregated SUM does not match the expected SUM");
+ BigDecimal expectedBigDecimal = expectedValues.get(buildKey(row));
+ assertEquals(actualBigDecimal, expectedBigDecimal);
}
+
mutableSegmentImpl.destroy();
}
@Test
public void testBigDecimalTooBig() {
String m1 = "sumPrecision1";
- Schema schema = getSchemaBuilder().addMetric(m1,
FieldSpec.DataType.BIG_DECIMAL).build();
+ Schema schema = getSchemaBuilder().addMetric(m1,
DataType.BIG_DECIMAL).build();
int seed = 1;
Random random = new Random(seed);
MutableSegmentImpl mutableSegmentImpl =
- MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Collections.singleton(m1), VAR_LENGTH_SET,
- INVERTED_INDEX_SET, Collections.singletonList(new
AggregationConfig(m1, "SUM_PRECISION(metric, 3)")));
+ MutableSegmentImplTestUtils.createMutableSegmentImpl(schema,
Set.of(m1), VAR_LENGTH_SET, INVERTED_INDEX_SET,
+ List.of(new AggregationConfig(m1, "SUM_PRECISION(metric, 3)")));
// Make a big decimal larger than 3 precision and try to index it
BigDecimal large = BigDecimalUtils.generateMaximumNumberWithPrecision(5);
GenericRow row = getRow(random, 1);
row.putValue("metric", large);
- Assert.assertThrows(IllegalArgumentException.class, () -> {
+ assertThrows(IllegalArgumentException.class, () -> {
mutableSegmentImpl.index(row, METADATA);
});
mutableSegmentImpl.destroy();
}
- private BigDecimal generateRandomBigDecimal(Random random, int maxPrecision,
int scale) {
+ private static BigDecimal generateRandomBigDecimal(Random random, int
maxPrecision, int scale) {
int precision = 1 + random.nextInt(maxPrecision);
-
- String s = "";
+ StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < precision; i++) {
- s = s + (1 + random.nextInt(9));
- }
-
- if ((1 + random.nextInt(2)) == 1) {
- return (new BigDecimal(s).setScale(scale)).negate();
- } else {
- return new BigDecimal(s).setScale(scale);
+ stringBuilder.append(1 + random.nextInt(9));
}
+ BigDecimal bigDecimal = new
BigDecimal(stringBuilder.toString()).setScale(scale, RoundingMode.UNNECESSARY);
+ return random.nextBoolean() ? bigDecimal : bigDecimal.negate();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]