noon-stripe commented on code in PR #8611: URL: https://github.com/apache/pinot/pull/8611#discussion_r869698932
########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/AggregationConfig.java: ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; Review Comment: done ########## pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java: ########## @@ -31,7 +31,6 @@ import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; - Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); + Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>(); + + for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) { + ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction()); + + // validation is also done when the table is created, this is just a sanity check. + Preconditions.checkState(!segmentConfig.aggregateMetrics(), Review Comment: done ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java: ########## @@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion") private final List<TransformConfig> _transformConfigs; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") + private final List<AggregationConfig> _aggregationConfigs; Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); + for (AggregationConfig aggregationConfig : aggregationConfigs) { + String columnName = aggregationConfig.getColumnName(); + if (schema != null) { + Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName + + "' of the aggregation function must be present in the schema"); + } + String aggregationFunction = aggregationConfig.getAggregationFunction(); + if (columnName == null || aggregationFunction == null) { + throw new IllegalStateException( + "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig); + } + + if (!aggregationColumns.add(columnName)) { + throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'"); + } + ExpressionContext expressionContext; + try { + expressionContext = RequestContextUtils.getExpressionFromSQL(aggregationConfig.getAggregationFunction()); + } catch (Exception e) { + throw new IllegalStateException( + "Invalid aggregation function '" + aggregationFunction + "' for column '" + columnName + "'", e); + } + Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION, + "aggregation function must be a function for: %s", aggregationConfig); + + FunctionContext functionContext = expressionContext.getFunction(); + validateIngestionAggregation(functionContext.getFunctionName()); + Preconditions.checkState(functionContext.getArguments().size() == 1, + "aggregation function can only have one argument: %s", aggregationConfig); + + ExpressionContext argument = functionContext.getArguments().get(0); + Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER, + "aggregator function argument must be a identifier: %s", aggregationConfig); + + Preconditions.checkState(schema.getFieldSpecFor(argument.getIdentifier()) == null, Review Comment: fixed ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { Review Comment: done ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java: ########## @@ -44,18 +45,23 @@ public class IngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Configs related to record transformation functions applied during ingestion") private final List<TransformConfig> _transformConfigs; + @JsonPropertyDescription("Configs related to record aggregation function applied during ingestion") + private final List<AggregationConfig> _aggregationConfigs; + @JsonPropertyDescription("Config related to handling complex type") private final ComplexTypeConfig _complexTypeConfig; @JsonCreator public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable BatchIngestionConfig batchIngestionConfig, @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig streamIngestionConfig, @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig, + @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> aggregationConfigs, Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + static public void validateIngestionAggregation(String name) { + /** + * Currently only, ValueAggregators with fixed width types are allowed, so MIN, MAX, SUM, and COUNT. The reason + * is that only the {@link org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex} + * supports random inserts and lookups. The + * {@link org.apache.pinot.segment.local.realtime.impl.forward.VarByteSVMutableForwardIndex only supports + * sequential inserts. + */ Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); Review Comment: done, added test ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); + Map<String, ValueAggregator> destColumnToValueAggregators = new HashMap<>(); + + for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) { + ExpressionContext expressionContext = RequestContextUtils.getExpressionFromSQL(config.getAggregationFunction()); Review Comment: done ########## pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java: ########## @@ -281,9 +284,10 @@ public void testSerDe() Map<String, String> prefixesToRename = new HashMap<>(); IngestionConfig ingestionConfig = new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, "APPEND", "HOURLY"), - new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), transformConfigs, - new ComplexTypeConfig(fieldsToUnnest, ".", - ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, prefixesToRename)); + new StreamIngestionConfig(streamConfigMaps), new FilterConfig("filterFunc(foo)"), aggregationConfigs, Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -363,6 +420,25 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + static public void validateIngestionAggregation(String name) { Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -1058,6 +1105,10 @@ private IdMap<FixedIntArray> enableMetricsAggregationIfPossible(RealtimeSegmentC RECORD_ID_MAP); } + private boolean isAggregateMetricsEnabled() { + return _aggregateMetrics || _ingestionAggregator.isEnabled(); Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -682,26 +707,43 @@ private void recordIndexingError(String indexType) { private void aggregateMetrics(GenericRow row, int docId) { for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) { - String column = metricFieldSpec.getName(); + String column = _ingestionAggregator.getMetricName(metricFieldSpec.getName()); Object value = row.getValue(column); - MutableForwardIndex forwardIndex = _indexContainerMap.get(column)._forwardIndex; + MutableForwardIndex forwardIndex = _indexContainerMap.get(metricFieldSpec.getName())._forwardIndex; DataType dataType = metricFieldSpec.getDataType(); - switch (dataType) { - case INT: - forwardIndex.setInt(docId, (Integer) value + forwardIndex.getInt(docId)); - break; - case LONG: - forwardIndex.setLong(docId, (Long) value + forwardIndex.getLong(docId)); - break; - case FLOAT: - forwardIndex.setFloat(docId, (Float) value + forwardIndex.getFloat(docId)); - break; - case DOUBLE: - forwardIndex.setDouble(docId, (Double) value + forwardIndex.getDouble(docId)); - break; - default: - throw new UnsupportedOperationException( - "Unsupported data type: " + dataType + " for aggregate metric column: " + column); + ValueAggregator valueAggregator = _ingestionAggregator.getAggregator(metricFieldSpec.getName()); + + if (valueAggregator != null) { + switch (valueAggregator.getAggregatedValueType()) { Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java: ########## @@ -306,15 +311,67 @@ public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Sc } } + // Aggregation configs + List<AggregationConfig> aggregationConfigs = ingestionConfig.getAggregationConfigs(); + Set<String> aggregationSourceColumns = new HashSet<>(); + if (aggregationConfigs != null) { + Preconditions.checkState( + tableConfig.getIndexingConfig() == null || !tableConfig.getIndexingConfig().isAggregateMetrics(), + "aggregateMetrics cannot be set with AggregationConfig"); + Set<String> aggregationColumns = new HashSet<>(); + for (AggregationConfig aggregationConfig : aggregationConfigs) { + String columnName = aggregationConfig.getColumnName(); + if (schema != null) { + Preconditions.checkState(schema.getFieldSpecFor(columnName) != null, "The destination column '" + columnName Review Comment: done and added test ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java: ########## @@ -40,7 +42,24 @@ private MutableSegmentImplTestUtils() { private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; private static final String SEGMENT_NAME = "testSegment__0__0__155555"; - private static final String STEAM_NAME = "testStream"; + private static final String STREAM_NAME = "testStream"; + + public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns, Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); + } + + Map<String, String> destColumnToSrcColumn = new HashMap<>(); Review Comment: done ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ingestionaggregation/IngestionAggregator.java: ########## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils.ingestionaggregation; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.utils.TableConfigUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.config.table.AggregationConfig; + + +public class IngestionAggregator { + private final Map<String, String> _aggregatorColumnNameToMetricColumnName; + private final Map<String, ValueAggregator> _aggregatorColumnNameToValueAggregator; + private boolean _disabled; + + public static IngestionAggregator fromRealtimeSegmentConfig(RealtimeSegmentConfig segmentConfig) { + if (segmentConfig == null || segmentConfig.getIngestionAggregationConfigs() == null + || segmentConfig.getIngestionAggregationConfigs().size() == 0) { + return new IngestionAggregator(new HashMap<>(), new HashMap<>()); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org