Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r872883391


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -20,6 +20,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import groovy.lang.Tuple2;

Review Comment:
   (minor) Let's use `org.apache.commons.lang3.tuple.Pair`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -336,10 +353,15 @@ public long getLatestIngestionTimestamp() {
       // Null value vector
       MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new 
MutableNullValueVector() : null;
 
+      String sourceColumn = metricsAggregators.containsKey(column) ? 
metricsAggregators.get(column).getFirst() : column;
+      ValueAggregator valueAggregator =
+          metricsAggregators.containsKey(column) ? 
metricsAggregators.get(column).getSecond() : null;

Review Comment:
   Reduce the map lookups
   ```suggestion
         Pair<String, ValueAggregator> columnAggregatorPair = 
metricsAggregators.get(column);
         String sourceColumn = columnAggregatorPair != null ? 
columnAggregatorPair.getLeft() : null;
         ValueAggregator valueAggregator = columnAggregatorPair != null ? 
columnAggregatorPair.getRight() : null;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -610,9 +664,13 @@ private void addNewRow(int docId, GenericRow row) {
                   "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
           }
 
+          if (column.equals("*")) {

Review Comment:
   We should never hit this branch



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1125,7 +1284,8 @@ private class IndexContainer implements Closeable {
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex 
invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex 
textIndex,
         @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index 
h3Index, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+        @Nullable MutableNullValueVector nullValueVector, String sourceColumn,

Review Comment:
   (minor)
   ```suggestion
           @Nullable MutableNullValueVector nullValueVector, @Nullable String 
sourceColumn,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {

Review Comment:
   Move this check after the null value check, or it will throw NPE



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {
+        Preconditions.checkState(
+            !tableConfig.getIndexingConfig().isAggregateMetrics(),
+            "aggregateMetrics cannot be set with AggregationConfig");
+        Set<String> aggregationColumns = new HashSet<>();
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          String columnName = aggregationConfig.getColumnName();
+          if (schema != null) {
+            FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+            Preconditions.checkState(fieldSpec != null, "The destination 
column '" + columnName
+                + "' of the aggregation function must be present in the 
schema");
+            Preconditions.checkState(fieldSpec.getFieldType() == 
FieldSpec.FieldType.METRIC,
+                "The destination column '" + columnName + "' of the 
aggregation function must be a metric column");
+          }
+          String aggregationFunction = 
aggregationConfig.getAggregationFunction();
+          if (columnName == null || aggregationFunction == null) {
+            throw new IllegalStateException(
+                "columnName/aggregationFunction cannot be null in 
AggregationConfig " + aggregationConfig);
+          }
+
+          if (!aggregationColumns.add(columnName)) {
+            throw new IllegalStateException("Duplicate aggregation config 
found for column '" + columnName + "'");
+          }
+          ExpressionContext expressionContext;
+          try {
+            expressionContext = 
RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          } catch (Exception e) {
+            throw new IllegalStateException(
+                "Invalid aggregation function '" + aggregationFunction + "' 
for column '" + columnName + "'", e);
+          }
+          Preconditions.checkState(expressionContext.getType() == 
ExpressionContext.Type.FUNCTION,
+              "aggregation function must be a function for: %s", 
aggregationConfig);
+
+          FunctionContext functionContext = expressionContext.getFunction();
+          validateIngestionAggregation(functionContext.getFunctionName());
+          Preconditions.checkState(functionContext.getArguments().size() == 1,
+              "aggregation function can only have one argument: %s", 
aggregationConfig);
+
+          ExpressionContext argument = functionContext.getArguments().get(0);
+          Preconditions.checkState(argument.getType() == 
ExpressionContext.Type.IDENTIFIER,
+              "aggregator function argument must be a identifier: %s", 
aggregationConfig);
+
+          aggregationSourceColumns.add(argument.getIdentifier());
+        }
+        if (schema != null) {
+          Preconditions.checkState(new 
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),

Review Comment:
   Should we consider using `SUM` on metric itself as the default if not 
configured?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -533,6 +556,37 @@ private void addNewRow(int docId, GenericRow row) {
       String column = entry.getKey();
       IndexContainer indexContainer = entry.getValue();
 
+      // aggregate metrics is enabled.
+      if (indexContainer._valueAggregator != null) {
+        Object value = row.getValue(indexContainer._sourceColumn);
+
+        // Update numValues info
+        indexContainer._numValuesInfo.updateSVEntry();
+
+        MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+        FieldSpec fieldSpec = indexContainer._fieldSpec;
+
+        DataType dataType = fieldSpec.getDataType();
+        value = 
indexContainer._valueAggregator.getInitialAggregatedValue(value);
+        switch (dataType.getStoredType()) {
+          case INT:
+            forwardIndex.setInt(docId, ((Number) value).intValue());
+            break;
+          case LONG:
+            forwardIndex.setLong(docId, ((Number) value).longValue());
+            break;
+          case FLOAT:
+            forwardIndex.setFloat(docId, ((Number) value).floatValue());
+            break;
+          case DOUBLE:
+            forwardIndex.setDouble(docId, ((Number) value).doubleValue());
+            break;
+          default:
+            throw new UnsupportedOperationException(
+                "Unsupported data type: " + dataType + " for aggregation: " + 
column);
+        }

Review Comment:
   Should we `continue` here?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +424,25 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
     }
   }
 
+  /**
+   * Currently only, ValueAggregators with fixed width types are allowed, so 
MIN, MAX, SUM, and COUNT. The reason
+   * is that only the {@link 
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+   * supports random inserts and lookups. The
+   * {@link 
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex
 only supports
+   * sequential inserts.
+   */
+  public static void validateIngestionAggregation(String name) {
+    List<AggregationFunctionType> allowed =

Review Comment:
   Put this as a constant `EnumSet` (`private static final 
EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS`)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig 
tableConfig, @Nullable Sc
         }
       }
 
+      // Aggregation configs
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      Set<String> aggregationSourceColumns = new HashSet<>();
+      if (aggregationConfigs != null) {

Review Comment:
   To be consistent with the check in mutable segment
   ```suggestion
         if (!CollectionUtils.isEmpty(aggregationConfigs)) {
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1098,6 +1200,61 @@ void updateMVEntry(int numValuesInMVEntry) {
     }
   }
 
+  static class IngestionAggregator {

Review Comment:
   (minor) Don't see much value on this inner util class. Suggest removing this 
class and change the method name to `getMetricsAggregators()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to