Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r870638029


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -196,7 +198,7 @@ public long getLatestIngestionTimestamp() {
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
-    _aggregateMetrics = config.aggregateMetrics();
+    _ingestionAggregator = 
IngestionAggregator.fromRealtimeSegmentConfig(config);

Review Comment:
   Let's initiate `_ingestionAggregator` only when `_recordIdMap` is set. We 
want to perform the checks before setting it up



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo 
getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, 
(Comparable) upsertComparisonValue);
   }
 
+  /**
+   *
+   * This helper function checks if the value is null, which is only 
applicable if the column isn't "*",
+   * which is a side effect of metrics aggregation "COUNT" operation.
+   *
+   * @param column
+   * @param value
+   * @return is the value null if the column is an actual data column (not 
"*").
+   */
+  private boolean isColumnValueNull(String column, Object value) {
+    return !column.equals("*") && value == null;
+  }
+
   private void updateDictionary(GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   This is not required. All the metrics must be raw (non-dictionary-encoded). 
We should check `dictionary != null` before the value read and check



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -506,13 +510,26 @@ private PartitionUpsertMetadataManager.RecordInfo 
getRecordInfo(GenericRow row,
     return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId, 
(Comparable) upsertComparisonValue);
   }
 
+  /**
+   *

Review Comment:
   (nit) Remove this line



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -706,26 +728,74 @@ private void recordIndexingError(String indexType) {
 
   private void aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
-      String column = metricFieldSpec.getName();
+      String column = 
_ingestionAggregator.getMetricName(metricFieldSpec.getName());

Review Comment:
   Suggest renaming this to `sourceColumn`, and keep the current `column`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1005,7 +1075,7 @@ private int getOrCreateDocId() {
    */
   private IdMap<FixedIntArray> 
enableMetricsAggregationIfPossible(RealtimeSegmentConfig config,

Review Comment:
   We can remove the return value and set both `_recordIdMap` and 
`_ingestionAggregator` in this method



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -815,7 +885,7 @@ public GenericRow getRecord(int docId, GenericRow reuse) {
    * Helper method to read the value for the given document id.
    */
   private static Object getValue(int docId, MutableForwardIndex forwardIndex, 
@Nullable MutableDictionary dictionary,
-      int maxNumMultiValues) {
+      int maxNumMultiValues, @Nullable ValueAggregator valueAggregator) {

Review Comment:
   `valueAggregator` is not needed



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java:
##########
@@ -360,6 +364,19 @@ private static void 
extractFieldsFromIngestionConfig(@Nullable IngestionConfig i
           fields.addAll(functionEvaluator.getArguments());
         }
       }
+      List<AggregationConfig> aggregationConfigs = 
ingestionConfig.getAggregationConfigs();
+      if (aggregationConfigs != null) {
+        for (AggregationConfig aggregationConfig : aggregationConfigs) {
+          ExpressionContext expressionContext =
+              
RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+          FunctionContext functionContext = expressionContext.getFunction();
+          if (functionContext != null) {
+            for (ExpressionContext argument : functionContext.getArguments()) {
+              fields.add(argument.getIdentifier());
+            }
+          }

Review Comment:
   This can handle identifier and nested functions
   ```suggestion
             expressionContext.getColumns(fields);
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -586,18 +603,19 @@ private void addNewRow(int docId, GenericRow row) {
 
           // Update forward index
           DataType dataType = fieldSpec.getDataType();
+          value = column.equals("*") ? 1 : value;

Review Comment:
   For aggregated metrics, we should set the value to 
`ValueAggregator.getInitialAggregatedValue(value)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -530,7 +547,7 @@ private void updateDictionary(GenericRow row) {
 
   private void addNewRow(int docId, GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : 
_indexContainerMap.entrySet()) {
-      String column = entry.getKey();
+      String column = _ingestionAggregator.getMetricName(entry.getKey());

Review Comment:
   Suggest splitting the logic of aggregated metric and regular column. Mixing 
them can complicate the handling logic, and potentially leads to bugs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to