Jackie-Jiang commented on code in PR #10926:
URL: https://github.com/apache/pinot/pull/10926#discussion_r1260315154


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {
+  }
+
+  public DistinctCountHLLValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we use the default _log2m of 8
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    try {
+      String log2mLit = arguments.get(1).getLiteral().getStringValue();
+      Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m 
argument must be a numeric literal");
+
+      _log2m = Integer.parseInt(log2mLit);

Review Comment:
   (minor)
   ```suggestion
         _log2m = arguments.get(1).getLiteral().getIntValue();
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -255,6 +255,14 @@ public boolean isMutableSegment() {
 
     // Initialize for each column
     for (FieldSpec fieldSpec : _physicalFieldSpecs) {
+      String fieldSpecName = fieldSpec.getName();
+      if (metricsAggregators.containsKey(fieldSpecName)) {
+        int maxLength = 
metricsAggregators.get(fieldSpecName).getRight().getMaxAggregatedValueByteSize();
+        if (maxLength > 0) {
+          fieldSpec.setMaxLength(maxLength);

Review Comment:
   This will update the original schema, and can potentially cause unexpected 
behavior. Ideally schema should not be modified 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/BaseSingleTreeBuilder.java:
##########
@@ -142,7 +143,8 @@ static class Record {
     for (AggregationFunctionColumnPair functionColumnPair : 
functionColumnPairs) {
       _metrics[index] = functionColumnPair.toColumnName();
       _functionColumnPairs[index] = functionColumnPair;
-      _valueAggregators[index] = 
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType());
+      _valueAggregators[index] =
+          
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), 
Collections.EMPTY_LIST);

Review Comment:
   (nit)
   ```suggestion
             
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), 
Collections.emptyList());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -100,6 +124,9 @@ public byte[] serializeAggregatedValue(HyperLogLog value) {
 
   @Override
   public HyperLogLog deserializeAggregatedValue(byte[] bytes) {
+    if (bytes == null || bytes.length == 0) {
+      return new HyperLogLog(_log2m);
+    }

Review Comment:
   I don't follow. Even if it is used in `getInitialAggregatedValue()`, the 
input should never be null or empty. Are we trying to handle invalid input data 
(e.g. empty byte array)? If so, the handling should be added to 
`getInitialAggregatedValue()` and `applyRawValue()` instead of here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {
+  }
+
+  public DistinctCountHLLValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we use the default _log2m of 8
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    try {
+      String log2mLit = arguments.get(1).getLiteral().getStringValue();
+      Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m 
argument must be a numeric literal");
+
+      _log2m = Integer.parseInt(log2mLit);
+      _log2mByteSize = (new HyperLogLog(_log2m)).getBytes().length;

Review Comment:
   We can add a util to get the byte size without serializing:
   `byteSize = (RegisterSet.getSizeForCount(1 << log2m) + 2) * Integer.BYTES`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java:
##########
@@ -28,6 +32,29 @@ public class SumPrecisionValueAggregator implements 
ValueAggregator<Object, BigD
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
 
   private int _maxByteSize;
+  private int _fixedSize = -1;
+
+  public SumPrecisionValueAggregator() {
+  }
+
+  /*
+  Aggregate with a optimal maximum precision in mind. Scale is always only 1 
32-bit
+  int and the storing of the scale value does not affect the size of the big 
decimal.
+  Given this, we won't care about scale in terms of the aggregations.
+  During query time, the optional scale parameter can be provided, but during 
aggregation,
+  we don't limit it.
+   */
+  public SumPrecisionValueAggregator(List<ExpressionContext> arguments) {
+    // length 1 means we don't have any caps on maximum precision nor do we 
have a fixed size then
+    if (arguments.size() <= 1) {
+      return;
+    }
+
+    String precision = arguments.get(1).getLiteral().getStringValue();
+    Preconditions.checkState(StringUtils.isNumeric(precision), "precision must 
be a numeric literal");
+
+    _fixedSize = 
BigDecimalUtils.byteSizeForFixedPrecision(Integer.parseInt(precision));

Review Comment:
   ```suggestion
       _fixedSize = 
BigDecimalUtils.byteSizeForFixedPrecision(arguments.get(1).getLiteral().getIntValue());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -288,6 +296,7 @@ public boolean isMutableSegment() {
       // Only support generating raw index on single-value columns that do not 
have inverted index while
       // consuming. After consumption completes and the segment is built, all 
single-value columns can have raw index
 
+

Review Comment:
   (nit) extra empty line



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1211,17 +1230,29 @@ private static Map<String, Pair<String, 
ValueAggregator>> fromAggregationConfig(
           "aggregation function must be a function: %s", config);
       FunctionContext functionContext = expressionContext.getFunction();
       
TableConfigUtils.validateIngestionAggregation(functionContext.getFunctionName());
-      Preconditions.checkState(functionContext.getArguments().size() == 1,
-          "aggregation function can only have one argument: %s", config);
+

Review Comment:
   IMO checking whether there are `>= 1` arguments is enough. Seems the check 
is already applied in the `TableConfigUtils`, so we can actually remove this 
check



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -378,12 +386,75 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
 
           FunctionContext functionContext = expressionContext.getFunction();
           validateIngestionAggregation(functionContext.getFunctionName());
-          Preconditions.checkState(functionContext.getArguments().size() == 1,
-              "aggregation function can only have one argument: %s", 
aggregationConfig);
+
+          List<ExpressionContext> arguments = functionContext.getArguments();
+
+          if (("distinctcounthll".equals(functionContext.getFunctionName()))

Review Comment:
   We need to use canonical name (removing underscore). Currently if the 
function name is `distinct_count_hll` or `sum_precision` it will fail



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteSVMutableForwardIndex.java:
##########
@@ -195,6 +205,22 @@ public void setDouble(int docId, double value) {
     getWriterForRow(docId).setDouble(docId, value);
   }
 
+  @Override
+  public byte[] getBytes(int docId) {
+    int bufferId = getBufferId(docId);
+    return _readers.get(bufferId).getBytes(docId);
+  }
+
+  @Override
+  public void setBytes(int docId, byte[] value) {
+    if (value.length != _valueSizeInBytes) {
+      throw new IllegalArgumentException("Expected value size to be " + 
_valueSizeInBytes + " but was " + value.length);
+    }

Review Comment:
   (minor)
   ```suggestion
       Preconditions.checkArgument(value.length == _valueSizeInBytes, "Expected 
value size to be: %s but got: %s ", _valueSizeInBytes, value.length);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java:
##########
@@ -28,11 +33,31 @@
 
 public class DistinctCountHLLValueAggregator implements 
ValueAggregator<Object, HyperLogLog> {
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
-  private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;
-
+  private int _log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
+  private int _log2mByteSize = 180;
   // Byte size won't change once we get the initial aggregated value
   private int _maxByteSize;
 
+  public DistinctCountHLLValueAggregator() {

Review Comment:
   (minor) This constructor is not used in production code. Should we consider 
modifying the test usage and remove it?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumPrecisionValueAggregator.java:
##########
@@ -28,6 +32,29 @@ public class SumPrecisionValueAggregator implements 
ValueAggregator<Object, BigD
   public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
 
   private int _maxByteSize;
+  private int _fixedSize = -1;
+
+  public SumPrecisionValueAggregator() {
+  }
+
+  /*
+  Aggregate with a optimal maximum precision in mind. Scale is always only 1 
32-bit

Review Comment:
   (code format) We usually indent (add 2 spaces) the block comment



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1193,8 +1212,8 @@ private static Map<String, Pair<String, ValueAggregator>> 
fromAggregateMetrics(R
 
     Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new 
HashMap<>();
     for (String metricName : segmentConfig.getSchema().getMetricNames()) {
-      columnNameToAggregator.put(metricName,
-          Pair.of(metricName, 
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM)));
+      columnNameToAggregator.put(metricName, Pair.of(metricName,
+          
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, 
Collections.EMPTY_LIST)));

Review Comment:
   (nit) We usually use
   ```suggestion
             
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, 
Collections.emptyList())));
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -269,13 +272,14 @@ public MutableIndex 
createMutableIndex(MutableIndexContext context, ForwardIndex
     String column = context.getFieldSpec().getName();
     String segmentName = context.getSegmentName();
     FieldSpec.DataType storedType = 
context.getFieldSpec().getDataType().getStoredType();
+    int maxLength = context.getFieldSpec().getMaxLength();

Review Comment:
   (MAJOR) I don't think this is the correct way to pass this information. We 
can probably add the fixed length info into the `MutableIndexContext` to avoid 
modifying the field spec



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteSVMutableForwardIndex.java:
##########
@@ -63,22 +63,32 @@ public class FixedByteSVMutableForwardIndex implements 
MutableForwardIndex {
 
   /**
    * @param storedType Data type of the values
+   * @param fixedLength Fixed length of values if known: only used for BYTES 
field and Hyperloglog values for now.
    * @param numRowsPerChunk Number of rows to pack in one chunk before a new 
chunk is created.
    * @param memoryManager Memory manager to be used for allocating memory.
    * @param allocationContext Allocation allocationContext.
    */
-  public FixedByteSVMutableForwardIndex(boolean dictionaryEncoded, DataType 
storedType, int numRowsPerChunk,
-      PinotDataBufferMemoryManager memoryManager, String allocationContext) {
+  public FixedByteSVMutableForwardIndex(boolean dictionaryEncoded, DataType 
storedType, int fixedLength,
+      int numRowsPerChunk, PinotDataBufferMemoryManager memoryManager, String 
allocationContext) {
     _dictionaryEncoded = dictionaryEncoded;
     _storedType = storedType;
-    _valueSizeInBytes = storedType.size();
+    if (storedType == DataType.BYTES || storedType == DataType.BIG_DECIMAL) {
+      _valueSizeInBytes = fixedLength;

Review Comment:
   Let's add a check here verifying `fixedLength` is positive



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -621,6 +631,10 @@ private void addNewRow(int docId, GenericRow row) {
           case DOUBLE:
             forwardIndex.add(((Number) value).doubleValue(), -1, docId);
             break;
+          case BIG_DECIMAL:

Review Comment:
   The above comment no longer apply.
   We should probably add some comment about using byte[] to support 
`BIG_DECIMAL`. It works because `BIG_DECIMAL` is actually stored as byte[] 
underlying



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/SumValueAggregator.java:
##########
@@ -37,11 +37,17 @@ public DataType getAggregatedValueType() {
 
   @Override
   public Double getInitialAggregatedValue(Number rawValue) {
+    if (rawValue == null) {

Review Comment:
   Currently the input should never be `null` (it should have already been 
filled with default value). My concern is that we are adding `null` handling to 
only this aggregation but not others. In order to completely support `null` 
input, we need to allow null value in, and annotate the input value as 
`@Nullable` and support it for all aggregations. That is not in the scope of 
this PR, so suggest doing it separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to