Jackie-Jiang commented on code in PR #8611:
URL: https://github.com/apache/pinot/pull/8611#discussion_r872883391
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import groovy.lang.Tuple2;
Review Comment:
(minor) Let's use `org.apache.commons.lang3.tuple.Pair`
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -336,10 +353,15 @@ public long getLatestIngestionTimestamp() {
// Null value vector
MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new
MutableNullValueVector() : null;
+ String sourceColumn = metricsAggregators.containsKey(column) ?
metricsAggregators.get(column).getFirst() : column;
+ ValueAggregator valueAggregator =
+ metricsAggregators.containsKey(column) ?
metricsAggregators.get(column).getSecond() : null;
Review Comment:
Reduce the map lookups
```suggestion
Pair<String, ValueAggregator> columnAggregatorPair =
metricsAggregators.get(column);
String sourceColumn = columnAggregatorPair != null ?
columnAggregatorPair.getLeft() : null;
ValueAggregator valueAggregator = columnAggregatorPair != null ?
columnAggregatorPair.getRight() : null;
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -610,9 +664,13 @@ private void addNewRow(int docId, GenericRow row) {
"Unsupported data type: " + dataType + " for no-dictionary
column: " + column);
}
+ if (column.equals("*")) {
Review Comment:
We should never hit this branch
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1125,7 +1284,8 @@ private class IndexContainer implements Closeable {
@Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex
invertedIndex,
@Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex
textIndex,
@Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index
h3Index, @Nullable BloomFilterReader bloomFilter,
- @Nullable MutableNullValueVector nullValueVector) {
+ @Nullable MutableNullValueVector nullValueVector, String sourceColumn,
Review Comment:
(minor)
```suggestion
@Nullable MutableNullValueVector nullValueVector, @Nullable String
sourceColumn,
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig
tableConfig, @Nullable Sc
}
}
+ // Aggregation configs
+ List<AggregationConfig> aggregationConfigs =
ingestionConfig.getAggregationConfigs();
+ Set<String> aggregationSourceColumns = new HashSet<>();
+ if (aggregationConfigs != null) {
+ Preconditions.checkState(
+ !tableConfig.getIndexingConfig().isAggregateMetrics(),
+ "aggregateMetrics cannot be set with AggregationConfig");
+ Set<String> aggregationColumns = new HashSet<>();
+ for (AggregationConfig aggregationConfig : aggregationConfigs) {
+ String columnName = aggregationConfig.getColumnName();
+ if (schema != null) {
Review Comment:
Move this check after the null value check, or it will throw NPE
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig
tableConfig, @Nullable Sc
}
}
+ // Aggregation configs
+ List<AggregationConfig> aggregationConfigs =
ingestionConfig.getAggregationConfigs();
+ Set<String> aggregationSourceColumns = new HashSet<>();
+ if (aggregationConfigs != null) {
+ Preconditions.checkState(
+ !tableConfig.getIndexingConfig().isAggregateMetrics(),
+ "aggregateMetrics cannot be set with AggregationConfig");
+ Set<String> aggregationColumns = new HashSet<>();
+ for (AggregationConfig aggregationConfig : aggregationConfigs) {
+ String columnName = aggregationConfig.getColumnName();
+ if (schema != null) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldSpec != null, "The destination
column '" + columnName
+ + "' of the aggregation function must be present in the
schema");
+ Preconditions.checkState(fieldSpec.getFieldType() ==
FieldSpec.FieldType.METRIC,
+ "The destination column '" + columnName + "' of the
aggregation function must be a metric column");
+ }
+ String aggregationFunction =
aggregationConfig.getAggregationFunction();
+ if (columnName == null || aggregationFunction == null) {
+ throw new IllegalStateException(
+ "columnName/aggregationFunction cannot be null in
AggregationConfig " + aggregationConfig);
+ }
+
+ if (!aggregationColumns.add(columnName)) {
+ throw new IllegalStateException("Duplicate aggregation config
found for column '" + columnName + "'");
+ }
+ ExpressionContext expressionContext;
+ try {
+ expressionContext =
RequestContextUtils.getExpression(aggregationConfig.getAggregationFunction());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Invalid aggregation function '" + aggregationFunction + "'
for column '" + columnName + "'", e);
+ }
+ Preconditions.checkState(expressionContext.getType() ==
ExpressionContext.Type.FUNCTION,
+ "aggregation function must be a function for: %s",
aggregationConfig);
+
+ FunctionContext functionContext = expressionContext.getFunction();
+ validateIngestionAggregation(functionContext.getFunctionName());
+ Preconditions.checkState(functionContext.getArguments().size() == 1,
+ "aggregation function can only have one argument: %s",
aggregationConfig);
+
+ ExpressionContext argument = functionContext.getArguments().get(0);
+ Preconditions.checkState(argument.getType() ==
ExpressionContext.Type.IDENTIFIER,
+ "aggregator function argument must be a identifier: %s",
aggregationConfig);
+
+ aggregationSourceColumns.add(argument.getIdentifier());
+ }
+ if (schema != null) {
+ Preconditions.checkState(new
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
Review Comment:
Should we consider using `SUM` on metric itself as the default if not
configured?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -533,6 +556,37 @@ private void addNewRow(int docId, GenericRow row) {
String column = entry.getKey();
IndexContainer indexContainer = entry.getValue();
+ // aggregate metrics is enabled.
+ if (indexContainer._valueAggregator != null) {
+ Object value = row.getValue(indexContainer._sourceColumn);
+
+ // Update numValues info
+ indexContainer._numValuesInfo.updateSVEntry();
+
+ MutableForwardIndex forwardIndex = indexContainer._forwardIndex;
+ FieldSpec fieldSpec = indexContainer._fieldSpec;
+
+ DataType dataType = fieldSpec.getDataType();
+ value =
indexContainer._valueAggregator.getInitialAggregatedValue(value);
+ switch (dataType.getStoredType()) {
+ case INT:
+ forwardIndex.setInt(docId, ((Number) value).intValue());
+ break;
+ case LONG:
+ forwardIndex.setLong(docId, ((Number) value).longValue());
+ break;
+ case FLOAT:
+ forwardIndex.setFloat(docId, ((Number) value).floatValue());
+ break;
+ case DOUBLE:
+ forwardIndex.setDouble(docId, ((Number) value).doubleValue());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported data type: " + dataType + " for aggregation: " +
column);
+ }
Review Comment:
Should we `continue` here?
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -363,6 +424,25 @@ public static void validateIngestionConfig(TableConfig
tableConfig, @Nullable Sc
}
}
+ /**
+ * Currently only, ValueAggregators with fixed width types are allowed, so
MIN, MAX, SUM, and COUNT. The reason
+ * is that only the {@link
org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex}
+ * supports random inserts and lookups. The
+ * {@link
org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex
only supports
+ * sequential inserts.
+ */
+ public static void validateIngestionAggregation(String name) {
+ List<AggregationFunctionType> allowed =
Review Comment:
Put this as a constant `EnumSet` (`private static final
EnumSet<AggregationFunctionType> SUPPORTED_INGESTION_AGGREGATIONS`)
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -306,15 +311,71 @@ public static void validateIngestionConfig(TableConfig
tableConfig, @Nullable Sc
}
}
+ // Aggregation configs
+ List<AggregationConfig> aggregationConfigs =
ingestionConfig.getAggregationConfigs();
+ Set<String> aggregationSourceColumns = new HashSet<>();
+ if (aggregationConfigs != null) {
Review Comment:
To be consistent with the check in mutable segment
```suggestion
if (!CollectionUtils.isEmpty(aggregationConfigs)) {
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -1098,6 +1200,61 @@ void updateMVEntry(int numValuesInMVEntry) {
}
}
+ static class IngestionAggregator {
Review Comment:
(minor) Don't see much value on this inner util class. Suggest removing this
class and change the method name to `getMetricsAggregators()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]