vvivekiyer commented on code in PR #10845: URL: https://github.com/apache/pinot/pull/10845#discussion_r1237801796
########## pinot-common/src/main/java/org/apache/pinot/common/request/context/IdentifierContext.java: ########## @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.request.context; + +import java.util.Objects; +import org.apache.pinot.common.utils.DataSchema; + +/** + * The {@code IdentifierContext} class represents a Identifer in the query. + * <p> This class includes information that about an identifier. The v1 engine uses column names for identifiers. The + * multistage query engine uses ordinals to distinctly track each identifier. So this context is set up to support both + * v1 and multistage engine identifiers. + */ +public class IdentifierContext { Review Comment: V1 engine uses the actual column name and uses it to fetch the values. In V2 engine, we need the ordinal info (index) to fetch the values from container rows. As you suggested, I did think about constructing a custom name like "$colname_$ordinal" to use in Identifier. But this would involve using an expensive string operations to extract the ordinal information from the identifier name (in `extractIntermediateValue`) and hence decided against it. Please let me know your thoughts. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NewAggregateOperator.java: ########## @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.IntermediateStageBlockValSet; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.function.AggFunctionQueryContext; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; + + +/** + * + * AggregateOperator is used to aggregate values over a set of group by keys. + * Output data will be in the format of [group by key, aggregate result1, ... aggregate resultN] + * Currently, we only support the following aggregation functions: + * 1. SUM + * 2. COUNT + * 3. MIN + * 4. MAX + * 5. DistinctCount and Count(Distinct) + * 6.AVG + * 7. FourthMoment + * 8. BoolAnd and BoolOr + * + * When the list of aggregation calls is empty, this class is used to calculate distinct result based on group by keys. + * In this case, the input can be any type. + * + * If the list of aggregation calls is not empty, the input of aggregation has to be a number. + * + * Note: This class performs aggregation over the double value of input. + * If the input is single value, the output type will be input type. Otherwise, the output type will be double. + */ +// TODO: Rename to AggregateOperator when merging Planner support. +public class NewAggregateOperator extends MultiStageOperator { + private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR"; + + private final MultiStageOperator _inputOperator; + private final DataSchema _resultSchema; + + // Aggregation containers + private final AggregationFunction[] _aggregationFunctions; + private final AggregationResultHolder[] _aggregationResultHolders; + + // Group By containers + private final List<ExpressionContext> _groupSet; + private final GroupByResultHolder[] _groupByResultHolders; + // Mapping from the group by row-key to the values in the row. + private final Map<Key, Object[]> _groupByKeyHolder; + // groupId and groupIdMap are used to create a 0-based index for group-by keys instead of using the hash value + // directly - similar to GroupByKeyGenerator. This is useful when we invoke the aggregation functions because they + // use the group by key indexes to store results. + private int _groupId = 0; + private Map<Integer, Integer> _groupIdMap; + + private TransferableBlock _upstreamErrorBlock; + private boolean _readyToConstruct; + private boolean _hasReturnedAggregateBlock; + + // Denotes whether this aggregation operator should merge intermediate results. + private boolean _isMergeAggregation; + + // TODO: refactor Pinot Reducer code to support the intermediate stage agg operator. + // aggCalls has to be a list of FunctionCall and cannot be null + // groupSet has to be a list of InputRef and cannot be null + // TODO: Add these two checks when we confirm we can handle error in upstream ctor call. + + @VisibleForTesting + public NewAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, + DataSchema resultSchema, List<FunctionContext> functionContexts, List<ExpressionContext> groupSet, + boolean isMergeAggregation, boolean isSingleStageAggregation) { + super(context); + _inputOperator = inputOperator; + _resultSchema = resultSchema; + + _groupSet = groupSet; + _groupByKeyHolder = new HashMap<>(); + _groupIdMap = new HashMap<>(); + _aggregationFunctions = new AggregationFunction[functionContexts.size()]; + _aggregationResultHolders = new AggregationResultHolder[functionContexts.size()]; + _groupByResultHolders = new GroupByResultHolder[functionContexts.size()]; + for (int i = 0; i < _aggregationFunctions.length; i++) { + _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), + new AggFunctionQueryContext(true)); + _aggregationResultHolders[i] = _aggregationFunctions[i].createAggregationResultHolder(); + _groupByResultHolders[i] = _aggregationFunctions[i].createGroupByResultHolder( + InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY, + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + } + + _upstreamErrorBlock = null; + _readyToConstruct = false; + _hasReturnedAggregateBlock = false; + _isMergeAggregation = isMergeAggregation && !isSingleStageAggregation; + } + + @Override + public List<MultiStageOperator> getChildOperators() { + return ImmutableList.of(_inputOperator); + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + protected TransferableBlock getNextBlock() { + try { + if (!_readyToConstruct && !consumeInputBlocks()) { + return TransferableBlockUtils.getNoOpTransferableBlock(); + } + + if (_upstreamErrorBlock != null) { + return _upstreamErrorBlock; + } + + if (!_hasReturnedAggregateBlock) { + return produceAggregatedBlock(); + } else { + // TODO: Move to close call. + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + } catch (Exception e) { + return TransferableBlockUtils.getErrorTransferableBlock(e); + } + } + + private TransferableBlock produceAggregatedBlock() { + List<Object[]> rows = _groupSet.isEmpty() ? collectAggregationResultRows() : collectGroupByResultRows(); + + _hasReturnedAggregateBlock = true; + if (rows.size() == 0) { + if (_groupSet.size() == 0) { + return constructEmptyAggResultBlock(); + } else { + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + } else { + return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); + } + } + + private List<Object[]> collectAggregationResultRows() { + List<Object[]> rows = new ArrayList<>(); + + Object[] row = new Object[_aggregationFunctions.length]; + for (int i = 0; i < _aggregationFunctions.length; i++) { + AggregationFunction aggregationFunction = _aggregationFunctions[i]; + row[i] = aggregationFunction.extractAggregationResult(_aggregationResultHolders[i]); + } + rows.add(row); + return rows; + } + + private List<Object[]> collectGroupByResultRows() { + List<Object[]> rows = new ArrayList<>(); + for (Map.Entry<Key, Object[]> e : _groupByKeyHolder.entrySet()) { + Object[] row = new Object[_aggregationFunctions.length + _groupSet.size()]; + Object[] keyElements = e.getValue(); + System.arraycopy(keyElements, 0, row, 0, keyElements.length); + for (int i = 0; i < _aggregationFunctions.length; i++) { + row[i + _groupSet.size()] = _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i], + _groupIdMap.get(e.getKey().hashCode())); + } + rows.add(row); + } + return rows; + } + + /** + * @return an empty agg result block for non-group-by aggregation. + */ + private TransferableBlock constructEmptyAggResultBlock() { + Object[] row = new Object[_aggregationFunctions.length]; + for (int i = 0; i < _aggregationFunctions.length; i++) { + AggregationFunction aggFunction = _aggregationFunctions[i]; + row[i] = aggFunction.extractAggregationResult(aggFunction.createAggregationResultHolder()); + } + return new TransferableBlock(Collections.singletonList(row), _resultSchema, DataBlock.Type.ROW); + } + + /** + * @return whether or not the operator is ready to move on (EOS or ERROR) + */ + private boolean consumeInputBlocks() { + TransferableBlock block = _inputOperator.nextBlock(); + while (!block.isNoOpBlock()) { + // setting upstream error block + if (block.isErrorBlock()) { + _upstreamErrorBlock = block; + return true; + } else if (block.isEndOfStreamBlock()) { + _readyToConstruct = true; + return true; + } + + List<Object[]> container = block.getContainer(); + if (_isMergeAggregation) { + mergeIntermediateValues(container); + } else { + aggregateValues(container); + } + + block = _inputOperator.nextBlock(); + } + return false; + } + + private void mergeIntermediateValues(List<Object[]> container) { + if (_groupSet.isEmpty()) { + performMergeAggregation(container); + } else { + performMergeGroupBy(container); + } + } + + private void performMergeAggregation(List<Object[]> container) { + // Simple aggregation function. + for (int i = 0; i < _aggregationFunctions.length; i++) { + List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions(); + for (Object[] row : container) { + Object intermediateResultToMerge = extractIntermediateValue(row, expressions); + _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge, + _aggregationResultHolders[i]); + } + } + } + + private void performMergeGroupBy(List<Object[]> container) { + // Create group by keys for each row. + int[] intKeys = generateGroupByKeys(container); + + for (int i = 0; i < _aggregationFunctions.length; i++) { + GroupByResultHolder groupByResultHolder = _groupByResultHolders[i]; + groupByResultHolder.ensureCapacity(_groupIdMap.size()); + List<ExpressionContext> expressions = _aggregationFunctions[i].getInputExpressions(); + + for (int j = 0; j < container.size(); j++) { + Object[] row = container.get(j); + Object intermediateResultToMerge = extractIntermediateValue(row, expressions); + _aggregationFunctions[i].mergeAndUpdateResultHolder(intermediateResultToMerge, groupByResultHolder, + intKeys[j]); + } + } + } + + Object extractIntermediateValue(Object[] row, List<ExpressionContext> expressions) { + // TODO: Add support to handle aggregation functions where: + // 1. The identifier need not be the first argument + // 2. There are more than one identifiers. + Preconditions.checkState(expressions.size() <= 1); + Preconditions.checkState(!expressions.isEmpty()); + ExpressionContext expr = expressions.get(0); + + Object result = expr.getType().equals(ExpressionContext.Type.IDENTIFIER) ? row[expr.getIdentifierIndex()] + : expr.getLiteral().getValue(); + return result; + } + + public void aggregateValues(List<Object[]> container) { + // Convert row to columnar representation + Map<Integer, List<Object>> columnValuesMap = new HashMap<>(); Review Comment: Answered below. ########## pinot-core/src/main/java/org/apache/pinot/core/common/IntermediateStageBlockValSet.java: ########## @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.common; + +import java.math.BigDecimal; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.PinotDataType; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec; +import org.roaringbitmap.RoaringBitmap; + +/** + * In the multistage engine, the leaf stage servers process the data in columnar fashion. By the time the + * intermediate stage receives the projected column, they are converted to a row based format. This class provides + * the capability to convert the row based represenation into blocks so that they can be used to process + * aggregations. + * TODO: Support MV + */ +public class IntermediateStageBlockValSet implements BlockValSet { + private final FieldSpec.DataType _dataType; + private final PinotDataType _pinotDataType; + private final List<Object> _values; + private final RoaringBitmap _nullBitMap; + private boolean _nullBitMapSet; + + public IntermediateStageBlockValSet(DataSchema.ColumnDataType columnDataType, List<Object> values) { Review Comment: We just fetch the `Object` value from every row of container and pass it as a list of objects here. There's no boxing to Object that happens here. Am I missing something? Are you suggesting that when we call `DataBlockUtils.extractRows()` we can avoid the boxing to Object as we already know the type and that we can use it here in `IntermediateStageBlockValSet`? If yes, this could be a great optimization that we could pick up orthogonal to this PR and that would also address your comment about using DataBlockUtils above. > is there any better way of handling this? converting row to columnar should be done via RowDataBlock and ColumnDataBlock abstraction? > > I agree we can do the refactoring later but was wondering if there's any way we can directly extract this part into a DataBlockUtils ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggFunctionQueryContext.java: ########## @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.List; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; + + +/** + * The <code>AggFunctionQueryContext</code> class contains extracted details from QueryContext that can be used for + * Aggregation Functions. + */ +public class AggFunctionQueryContext { Review Comment: ` AggregationFunctionFactory.getAggregationFunction()` used queryContext earlier to figure out the details like - nullHandling, Limit, OrderBy. We need this refactor for this effort because the intermediate operators are unaware of QueryContext. ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java: ########## @@ -106,6 +106,27 @@ void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder */ IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2); + /** + * Merges two intermediate results and also updates the aggregation result holder. This is needed when aggregation + * is processed in multiple stages to store the intermediate results. + */ + default void mergeAndUpdateResultHolder(IntermediateResult intermediateResult, + AggregationResultHolder aggregationResultHolder) { + // TODO: Remove when support for all aggregation functions is added to the Multistage engine. Review Comment: The end state is that every aggregation function that is yet to be supported (eg: mode, covariance) will have their own implementations of `mergeAndUpdateResultHolder()` ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java: ########## @@ -187,17 +189,19 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio } else { switch (AggregationFunctionType.valueOf(upperCaseFunctionName)) { case COUNT: - return new CountAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); + return new CountAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled()); case MIN: - return new MinAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); + return new MinAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled()); case MAX: - return new MaxAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); + return new MaxAggregationFunction(firstArgument, aggFunctionQueryContext.isNullHandlingEnabled()); + // TODO(Sonam): Uncomment SUM0 when merging planner changes case SUM: - return new SumAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled()); + // case SUM0: Review Comment: The return type etc is set by the planner changes in [AggregationFunctionType.java](https://github.com/apache/pinot/pull/10846/files#diff-57508c9bebed5cdf9f9fcdbf19728615ae9f224f9e09e42de03e60f5235c7306) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org