Jackie-Jiang commented on code in PR #10000: URL: https://github.com/apache/pinot/pull/10000#discussion_r1060909219
########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.trace.Tracing; + + +/** + * The <code>FilteredGroupByOperator</code> class provides the operator for group-by query on a single segment when + * there are 1 or more filter expressions on aggregations. + */ +@SuppressWarnings("rawtypes") +public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { + private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED"; + + @Nullable + private final AggregationFunction[] _aggregationFunctions; + private final List<Pair<AggregationFunction[], TransformOperator>> _aggFunctionsWithTransformOperator; + private final ExpressionContext[] _groupByExpressions; + private final long _numTotalDocs; + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + private final DataSchema _dataSchema; + private final QueryContext _queryContext; + private TableResizer _tableResizer; + private GroupKeyGenerator _groupKeyGenerator = null; Review Comment: (minor) These 2 member variables can be converted to local ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.trace.Tracing; + + +/** + * The <code>FilteredGroupByOperator</code> class provides the operator for group-by query on a single segment when + * there are 1 or more filter expressions on aggregations. + */ +@SuppressWarnings("rawtypes") +public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { + private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED"; + + @Nullable + private final AggregationFunction[] _aggregationFunctions; + private final List<Pair<AggregationFunction[], TransformOperator>> _aggFunctionsWithTransformOperator; + private final ExpressionContext[] _groupByExpressions; + private final long _numTotalDocs; + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + private final DataSchema _dataSchema; + private final QueryContext _queryContext; + private TableResizer _tableResizer; + private GroupKeyGenerator _groupKeyGenerator = null; + + public FilteredGroupByOperator( + @Nullable AggregationFunction[] aggregationFunctions, Review Comment: I don't think it can be null. Actually if it is null, line 83 will throw NPE ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java: ########## @@ -71,11 +82,13 @@ public static AggregationFunctionColumnPair getAggregationFunctionColumnPair( * <p>NOTE: We don't need to consider order-by columns here as the ordering is only allowed for aggregation functions * or group-by expressions. */ - public static Set<ExpressionContext> collectExpressionsToTransform(AggregationFunction[] aggregationFunctions, - @Nullable ExpressionContext[] groupByExpressions) { + public static Set<ExpressionContext> collectExpressionsToTransform( + @Nullable AggregationFunction[] aggregationFunctions, @Nullable ExpressionContext[] groupByExpressions) { Review Comment: `aggregationFunctions` should always be non-null ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java: ########## @@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements GroupByExecutor { protected final AggregationFunction[] _aggregationFunctions; protected final boolean _nullHandlingEnabled; - protected final GroupKeyGenerator _groupKeyGenerator; - protected final GroupByResultHolder[] _groupByResultHolders; protected final boolean _hasMVGroupByExpression; protected final int[] _svGroupKeys; protected final int[][] _mvGroupKeys; + protected GroupKeyGenerator _groupKeyGenerator; + protected GroupByResultHolder[] _groupByResultHolders; + + public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] groupByExpressions, + TransformOperator transformOperator) { + Review Comment: (nit) Remove empty line ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java: ########## @@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements GroupByExecutor { protected final AggregationFunction[] _aggregationFunctions; protected final boolean _nullHandlingEnabled; - protected final GroupKeyGenerator _groupKeyGenerator; - protected final GroupByResultHolder[] _groupByResultHolders; protected final boolean _hasMVGroupByExpression; protected final int[] _svGroupKeys; protected final int[][] _mvGroupKeys; + protected GroupKeyGenerator _groupKeyGenerator; + protected GroupByResultHolder[] _groupByResultHolders; Review Comment: These 2 variables can still be `final` ########## pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java: ########## @@ -335,10 +335,13 @@ public void testMixedAggregationsOfSameType() { testQuery(filterQuery, nonFilterQuery); } - @Test(expectedExceptions = IllegalStateException.class) + @Test public void testGroupBySupport() { - String filterQuery = "SELECT MIN(INT_COL) FILTER(WHERE NO_INDEX_COL > 2), MAX(INT_COL) FILTER(WHERE INT_COL > 2) " - + "FROM MyTable WHERE INT_COL < 1000 GROUP BY INT_COL"; - getBrokerResponse(filterQuery); + String filterQuery = + "SELECT SUM(INT_COL), SUM(INT_COL) FILTER(WHERE INT_COL > 25000) AS total_sum FROM MyTable GROUP BY INT_COL"; + String nonFilterQuery = + "SELECT SUM(INT_COL), SUM(CASE WHEN INT_COL > 25000 THEN INT_COL ELSE 0 END) AS total_sum FROM MyTable GROUP " + + "BY INT_COL"; + testQuery(filterQuery, nonFilterQuery); Review Comment: Let's add some more tests to guarantee that it works as expected ########## pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java: ########## @@ -50,10 +54,35 @@ public GroupByPlanNode(IndexSegment indexSegment, QueryContext queryContext) { } @Override - public GroupByOperator run() { + public Operator<GroupByResultsBlock> run() { assert _queryContext.getAggregationFunctions() != null; assert _queryContext.getGroupByExpressions() != null; + if (_queryContext.hasFilteredAggregations()) { + return filteredGroupByPlan(); + } + + return nonFilteredGroupByPlan(); + } + + private FilteredGroupByOperator filteredGroupByPlan() { Review Comment: (minor) Rename to `buildFilteredGroupByPlan()` and `buildNonFilteredGroupByPlan()` ########## pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByTrimTest.java: ########## @@ -130,7 +129,7 @@ void testGroupByTrim(QueryContext queryContext, int minSegmentGroupTrimSize, int // Extract the execution result List<Pair<Double, Double>> extractedResult = extractTestResult(resultsBlock.getTable()); - assertEquals(extractedResult, expectedResult); + Assert.assertEquals(extractedResult, expectedResult); Review Comment: (nit) We usually use static import for `Assert` in test ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java: ########## @@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements GroupByExecutor { protected final AggregationFunction[] _aggregationFunctions; protected final boolean _nullHandlingEnabled; - protected final GroupKeyGenerator _groupKeyGenerator; - protected final GroupByResultHolder[] _groupByResultHolders; protected final boolean _hasMVGroupByExpression; protected final int[] _svGroupKeys; protected final int[][] _mvGroupKeys; + protected GroupKeyGenerator _groupKeyGenerator; + protected GroupByResultHolder[] _groupByResultHolders; + + public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] groupByExpressions, + TransformOperator transformOperator) { + + this(queryContext, queryContext.getAggregationFunctions(), groupByExpressions, transformOperator, null); + } + + public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] aggregationFunctions, + ExpressionContext[] groupByExpressions, TransformOperator transformOperator) { + + this(queryContext, aggregationFunctions, groupByExpressions, transformOperator, null); + } /** * Constructor for the class. * * @param queryContext Query context + * @param aggregationFunctions Aggregation functions * @param groupByExpressions Array of group-by expressions * @param transformOperator Transform operator */ - public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] groupByExpressions, - TransformOperator transformOperator) { - _aggregationFunctions = queryContext.getAggregationFunctions(); + public DefaultGroupByExecutor(QueryContext queryContext, AggregationFunction[] aggregationFunctions, + ExpressionContext[] groupByExpressions, TransformOperator transformOperator, + GroupKeyGenerator groupKeyGenerator) { Review Comment: Annotate `groupKeyGenerator` as `Nullable` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.trace.Tracing; + + +/** + * The <code>FilteredGroupByOperator</code> class provides the operator for group-by query on a single segment when + * there are 1 or more filter expressions on aggregations. + */ +@SuppressWarnings("rawtypes") +public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { + private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED"; + + @Nullable + private final AggregationFunction[] _aggregationFunctions; + private final List<Pair<AggregationFunction[], TransformOperator>> _aggFunctionsWithTransformOperator; + private final ExpressionContext[] _groupByExpressions; + private final long _numTotalDocs; + private long _numDocsScanned; + private long _numEntriesScannedInFilter; + private long _numEntriesScannedPostFilter; + private final DataSchema _dataSchema; + private final QueryContext _queryContext; + private TableResizer _tableResizer; + private GroupKeyGenerator _groupKeyGenerator = null; + + public FilteredGroupByOperator( + @Nullable AggregationFunction[] aggregationFunctions, + List<Pair<AggregationFunction[], TransformOperator>> aggFunctionsWithTransformOperator, + ExpressionContext[] groupByExpressions, + long numTotalDocs, + QueryContext queryContext) { + _aggregationFunctions = aggregationFunctions; + _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator; + _groupByExpressions = groupByExpressions; + _numTotalDocs = numTotalDocs; + _queryContext = queryContext; + _tableResizer = null; + + // NOTE: The indexedTable expects that the data schema will have group by columns before aggregation columns + int numGroupByExpressions = groupByExpressions.length; + int numAggregationFunctions = aggregationFunctions.length; + int numColumns = numGroupByExpressions + numAggregationFunctions; + String[] columnNames = new String[numColumns]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns]; + + // Extract column names and data types for group-by columns + for (int i = 0; i < numGroupByExpressions; i++) { + ExpressionContext groupByExpression = groupByExpressions[i]; + columnNames[i] = groupByExpression.toString(); + columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV( + aggFunctionsWithTransformOperator.get(i).getRight().getResultMetadata(groupByExpression).getDataType()); + } + + // Extract column names and data types for aggregation functions + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunction aggregationFunction = aggregationFunctions[i]; + int index = numGroupByExpressions + i; + columnNames[index] = aggregationFunction.getResultColumnName(); + columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType(); + } + + _dataSchema = new DataSchema(columnNames, columnDataTypes); + } + + @Override + protected GroupByResultsBlock getNextBlock() { + // TODO(egalpin): Support Startree query resolution when possible, even with FILTER expressions + assert _aggregationFunctions != null; + boolean numGroupsLimitReached = false; + int numAggregations = _aggregationFunctions.length; + + GroupByResultHolder[] groupByResultHolders = new GroupByResultHolder[numAggregations]; + IdentityHashMap<AggregationFunction, Integer> resultHolderIndexMap = new IdentityHashMap<>(numAggregations); + for (int i = 0; i < numAggregations; i++) { + resultHolderIndexMap.put(_aggregationFunctions[i], i); + } + + for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : _aggFunctionsWithTransformOperator) { + TransformOperator transformOperator = filteredAggregation.getRight(); + AggregationFunction[] filteredAggFunctions = filteredAggregation.getLeft(); + + // Perform aggregation group-by on all the blocks + DefaultGroupByExecutor groupByExecutor; + if (_groupKeyGenerator == null) { Review Comment: This is very smart to share the same group-key generator. Let's add some comments about why we need to do so -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org