Jackie-Jiang commented on code in PR #10000:
URL: https://github.com/apache/pinot/pull/10000#discussion_r1060909219


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.trace.Tracing;
+
+
+/**
+ * The <code>FilteredGroupByOperator</code> class provides the operator for 
group-by query on a single segment when
+ * there are 1 or more filter expressions on aggregations.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> 
{
+  private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED";
+
+  @Nullable
+  private final AggregationFunction[] _aggregationFunctions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> 
_aggFunctionsWithTransformOperator;
+  private final ExpressionContext[] _groupByExpressions;
+  private final long _numTotalDocs;
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private final DataSchema _dataSchema;
+  private final QueryContext _queryContext;
+  private TableResizer _tableResizer;
+  private GroupKeyGenerator _groupKeyGenerator = null;

Review Comment:
   (minor) These 2 member variables can be converted to local



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.trace.Tracing;
+
+
+/**
+ * The <code>FilteredGroupByOperator</code> class provides the operator for 
group-by query on a single segment when
+ * there are 1 or more filter expressions on aggregations.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> 
{
+  private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED";
+
+  @Nullable
+  private final AggregationFunction[] _aggregationFunctions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> 
_aggFunctionsWithTransformOperator;
+  private final ExpressionContext[] _groupByExpressions;
+  private final long _numTotalDocs;
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private final DataSchema _dataSchema;
+  private final QueryContext _queryContext;
+  private TableResizer _tableResizer;
+  private GroupKeyGenerator _groupKeyGenerator = null;
+
+  public FilteredGroupByOperator(
+      @Nullable AggregationFunction[] aggregationFunctions,

Review Comment:
   I don't think it can be null. Actually if it is null, line 83 will throw NPE



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java:
##########
@@ -71,11 +82,13 @@ public static AggregationFunctionColumnPair 
getAggregationFunctionColumnPair(
    * <p>NOTE: We don't need to consider order-by columns here as the ordering 
is only allowed for aggregation functions
    *          or group-by expressions.
    */
-  public static Set<ExpressionContext> 
collectExpressionsToTransform(AggregationFunction[] aggregationFunctions,
-      @Nullable ExpressionContext[] groupByExpressions) {
+  public static Set<ExpressionContext> collectExpressionsToTransform(
+      @Nullable AggregationFunction[] aggregationFunctions, @Nullable 
ExpressionContext[] groupByExpressions) {

Review Comment:
   `aggregationFunctions` should always be non-null



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java:
##########
@@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
 
   protected final AggregationFunction[] _aggregationFunctions;
   protected final boolean _nullHandlingEnabled;
-  protected final GroupKeyGenerator _groupKeyGenerator;
-  protected final GroupByResultHolder[] _groupByResultHolders;
   protected final boolean _hasMVGroupByExpression;
   protected final int[] _svGroupKeys;
   protected final int[][] _mvGroupKeys;
+  protected GroupKeyGenerator _groupKeyGenerator;
+  protected GroupByResultHolder[] _groupByResultHolders;
+
+  public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] 
groupByExpressions,
+      TransformOperator transformOperator) {
+

Review Comment:
   (nit) Remove empty line



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java:
##########
@@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
 
   protected final AggregationFunction[] _aggregationFunctions;
   protected final boolean _nullHandlingEnabled;
-  protected final GroupKeyGenerator _groupKeyGenerator;
-  protected final GroupByResultHolder[] _groupByResultHolders;
   protected final boolean _hasMVGroupByExpression;
   protected final int[] _svGroupKeys;
   protected final int[][] _mvGroupKeys;
+  protected GroupKeyGenerator _groupKeyGenerator;
+  protected GroupByResultHolder[] _groupByResultHolders;

Review Comment:
   These 2 variables can still be `final`



##########
pinot-core/src/test/java/org/apache/pinot/queries/FilteredAggregationsTest.java:
##########
@@ -335,10 +335,13 @@ public void testMixedAggregationsOfSameType() {
     testQuery(filterQuery, nonFilterQuery);
   }
 
-  @Test(expectedExceptions = IllegalStateException.class)
+  @Test
   public void testGroupBySupport() {
-    String filterQuery = "SELECT MIN(INT_COL) FILTER(WHERE NO_INDEX_COL > 2), 
MAX(INT_COL) FILTER(WHERE INT_COL > 2) "
-        + "FROM MyTable WHERE INT_COL < 1000 GROUP BY INT_COL";
-    getBrokerResponse(filterQuery);
+    String filterQuery =
+        "SELECT SUM(INT_COL), SUM(INT_COL) FILTER(WHERE INT_COL > 25000) AS 
total_sum FROM MyTable GROUP BY INT_COL";
+    String nonFilterQuery =
+        "SELECT SUM(INT_COL), SUM(CASE WHEN INT_COL > 25000 THEN INT_COL ELSE 
0 END) AS total_sum FROM MyTable GROUP "
+            + "BY INT_COL";
+    testQuery(filterQuery, nonFilterQuery);

Review Comment:
   Let's add some more tests to guarantee that it works as expected



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java:
##########
@@ -50,10 +54,35 @@ public GroupByPlanNode(IndexSegment indexSegment, 
QueryContext queryContext) {
   }
 
   @Override
-  public GroupByOperator run() {
+  public Operator<GroupByResultsBlock> run() {
     assert _queryContext.getAggregationFunctions() != null;
     assert _queryContext.getGroupByExpressions() != null;
 
+    if (_queryContext.hasFilteredAggregations()) {
+      return filteredGroupByPlan();
+    }
+
+    return nonFilteredGroupByPlan();
+  }
+
+  private FilteredGroupByOperator filteredGroupByPlan() {

Review Comment:
   (minor) Rename to `buildFilteredGroupByPlan()` and 
`buildNonFilteredGroupByPlan()`



##########
pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByTrimTest.java:
##########
@@ -130,7 +129,7 @@ void testGroupByTrim(QueryContext queryContext, int 
minSegmentGroupTrimSize, int
     // Extract the execution result
     List<Pair<Double, Double>> extractedResult = 
extractTestResult(resultsBlock.getTable());
 
-    assertEquals(extractedResult, expectedResult);
+    Assert.assertEquals(extractedResult, expectedResult);

Review Comment:
   (nit) We usually use static import for `Assert` in test



##########
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java:
##########
@@ -52,24 +52,39 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
 
   protected final AggregationFunction[] _aggregationFunctions;
   protected final boolean _nullHandlingEnabled;
-  protected final GroupKeyGenerator _groupKeyGenerator;
-  protected final GroupByResultHolder[] _groupByResultHolders;
   protected final boolean _hasMVGroupByExpression;
   protected final int[] _svGroupKeys;
   protected final int[][] _mvGroupKeys;
+  protected GroupKeyGenerator _groupKeyGenerator;
+  protected GroupByResultHolder[] _groupByResultHolders;
+
+  public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] 
groupByExpressions,
+      TransformOperator transformOperator) {
+
+    this(queryContext, queryContext.getAggregationFunctions(), 
groupByExpressions, transformOperator, null);
+  }
+
+  public DefaultGroupByExecutor(QueryContext queryContext, 
AggregationFunction[] aggregationFunctions,
+      ExpressionContext[] groupByExpressions, TransformOperator 
transformOperator) {
+
+    this(queryContext, aggregationFunctions, groupByExpressions, 
transformOperator, null);
+  }
 
   /**
    * Constructor for the class.
    *
    * @param queryContext Query context
+   * @param aggregationFunctions Aggregation functions
    * @param groupByExpressions Array of group-by expressions
    * @param transformOperator Transform operator
    */
-  public DefaultGroupByExecutor(QueryContext queryContext, ExpressionContext[] 
groupByExpressions,
-      TransformOperator transformOperator) {
-    _aggregationFunctions = queryContext.getAggregationFunctions();
+  public DefaultGroupByExecutor(QueryContext queryContext, 
AggregationFunction[] aggregationFunctions,
+      ExpressionContext[] groupByExpressions, TransformOperator 
transformOperator,
+      GroupKeyGenerator groupKeyGenerator) {

Review Comment:
   Annotate `groupKeyGenerator` as `Nullable`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.trace.Tracing;
+
+
+/**
+ * The <code>FilteredGroupByOperator</code> class provides the operator for 
group-by query on a single segment when
+ * there are 1 or more filter expressions on aggregations.
+ */
+@SuppressWarnings("rawtypes")
+public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> 
{
+  private static final String EXPLAIN_NAME = "GROUP_BY_FILTERED";
+
+  @Nullable
+  private final AggregationFunction[] _aggregationFunctions;
+  private final List<Pair<AggregationFunction[], TransformOperator>> 
_aggFunctionsWithTransformOperator;
+  private final ExpressionContext[] _groupByExpressions;
+  private final long _numTotalDocs;
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private final DataSchema _dataSchema;
+  private final QueryContext _queryContext;
+  private TableResizer _tableResizer;
+  private GroupKeyGenerator _groupKeyGenerator = null;
+
+  public FilteredGroupByOperator(
+      @Nullable AggregationFunction[] aggregationFunctions,
+      List<Pair<AggregationFunction[], TransformOperator>> 
aggFunctionsWithTransformOperator,
+      ExpressionContext[] groupByExpressions,
+      long numTotalDocs,
+      QueryContext queryContext) {
+    _aggregationFunctions = aggregationFunctions;
+    _aggFunctionsWithTransformOperator = aggFunctionsWithTransformOperator;
+    _groupByExpressions = groupByExpressions;
+    _numTotalDocs = numTotalDocs;
+    _queryContext = queryContext;
+    _tableResizer = null;
+
+    // NOTE: The indexedTable expects that the data schema will have group by 
columns before aggregation columns
+    int numGroupByExpressions = groupByExpressions.length;
+    int numAggregationFunctions = aggregationFunctions.length;
+    int numColumns = numGroupByExpressions + numAggregationFunctions;
+    String[] columnNames = new String[numColumns];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numColumns];
+
+    // Extract column names and data types for group-by columns
+    for (int i = 0; i < numGroupByExpressions; i++) {
+      ExpressionContext groupByExpression = groupByExpressions[i];
+      columnNames[i] = groupByExpression.toString();
+      columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV(
+          
aggFunctionsWithTransformOperator.get(i).getRight().getResultMetadata(groupByExpression).getDataType());
+    }
+
+    // Extract column names and data types for aggregation functions
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      AggregationFunction aggregationFunction = aggregationFunctions[i];
+      int index = numGroupByExpressions + i;
+      columnNames[index] = aggregationFunction.getResultColumnName();
+      columnDataTypes[index] = 
aggregationFunction.getIntermediateResultColumnType();
+    }
+
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+  }
+
+  @Override
+  protected GroupByResultsBlock getNextBlock() {
+    // TODO(egalpin): Support Startree query resolution when possible, even 
with FILTER expressions
+    assert _aggregationFunctions != null;
+    boolean numGroupsLimitReached = false;
+    int numAggregations = _aggregationFunctions.length;
+
+    GroupByResultHolder[] groupByResultHolders = new 
GroupByResultHolder[numAggregations];
+    IdentityHashMap<AggregationFunction, Integer> resultHolderIndexMap = new 
IdentityHashMap<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      resultHolderIndexMap.put(_aggregationFunctions[i], i);
+    }
+
+    for (Pair<AggregationFunction[], TransformOperator> filteredAggregation : 
_aggFunctionsWithTransformOperator) {
+      TransformOperator transformOperator = filteredAggregation.getRight();
+      AggregationFunction[] filteredAggFunctions = 
filteredAggregation.getLeft();
+
+      // Perform aggregation group-by on all the blocks
+      DefaultGroupByExecutor groupByExecutor;
+      if (_groupKeyGenerator == null) {

Review Comment:
   This is very smart to share the same group-key generator. Let's add some 
comments about why we need to do so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to