Jackie-Jiang commented on a change in pull request #7916:
URL: https://github.com/apache/pinot/pull/7916#discussion_r794904539



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -62,31 +68,164 @@ public AggregationPlanNode(IndexSegment indexSegment, 
QueryContext queryContext)
   public Operator<IntermediateResultsBlock> run() {
     assert _queryContext.getAggregationFunctions() != null;
 
+    boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+    if (hasFilteredPredicates) {
+      return buildFilteredAggOperator();
+    }
+
+    return buildNonFilteredAggOperator();

Review comment:
       (minor) Can be simplified for better readability
   ```suggestion
       return _queryContext.isHasFilteredAggregations() ? 
buildFilteredAggOperator() : buildNonFilteredAggOperator();
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -62,31 +68,164 @@ public AggregationPlanNode(IndexSegment indexSegment, 
QueryContext queryContext)
   public Operator<IntermediateResultsBlock> run() {
     assert _queryContext.getAggregationFunctions() != null;
 
+    boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+    if (hasFilteredPredicates) {
+      return buildFilteredAggOperator();
+    }
+
+    return buildNonFilteredAggOperator();
+  }
+
+  /**
+   * Returns {@code true} if the given aggregations can be solved with segment 
metadata, {@code false} otherwise.
+   * <p>Aggregations supported: COUNT
+   */
+  private static boolean isFitForMetadataBasedPlan(AggregationFunction[] 
aggregationFunctions) {
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      if (aggregationFunction.getType() != AggregationFunctionType.COUNT) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns {@code true} if the given aggregations can be solved with 
dictionary, {@code false} otherwise.
+   * <p>Aggregations supported: MIN, MAX, MIN_MAX_RANGE, DISTINCT_COUNT, 
SEGMENT_PARTITIONED_DISTINCT_COUNT
+   */
+  private static boolean isFitForDictionaryBasedPlan(AggregationFunction[] 
aggregationFunctions,
+      IndexSegment indexSegment) {
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      AggregationFunctionType functionType = aggregationFunction.getType();
+      if (functionType != AggregationFunctionType.MIN && functionType != 
AggregationFunctionType.MAX
+          && functionType != AggregationFunctionType.MINMAXRANGE
+          && functionType != AggregationFunctionType.DISTINCTCOUNT
+          && functionType != 
AggregationFunctionType.SEGMENTPARTITIONEDDISTINCTCOUNT) {
+        return false;
+      }
+      ExpressionContext argument = (ExpressionContext) 
aggregationFunction.getInputExpressions().get(0);
+      if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
+        return false;
+      }
+      String column = argument.getIdentifier();
+      Dictionary dictionary = 
indexSegment.getDataSource(column).getDictionary();
+      if (dictionary == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Build a FilteredAggregationOperator given the parameters.
+   * @param mainPredicateFilterOperator Filter operator corresponding to the 
main predicate
+   * @param mainTransformOperator Transform operator corresponding to the main 
predicate
+   * @param numTotalDocs Number of total docs
+   */
+  private BaseOperator<IntermediateResultsBlock> 
buildOperatorForFilteredAggregations(
+      BaseFilterOperator mainPredicateFilterOperator,
+      TransformOperator mainTransformOperator, int numTotalDocs) {
+    Map<FilterContext, Pair<List<AggregationFunction>, TransformOperator>> 
filterContextToAggFuncsMap =
+        new HashMap<>();
+    List<AggregationFunction> nonFilteredAggregationFunctions = new 
ArrayList<>();
+    List<Pair<AggregationFunction, FilterContext>> aggregationFunctions = 
_queryContext
+        .getFilteredAggregations();
+
+    // For each aggregation function, check if the aggregation function is a 
filtered agg.
+    // If it is, populate the corresponding filter operator and corresponding 
transform operator
+    for (Pair<AggregationFunction, FilterContext> inputPair : 
aggregationFunctions) {
+      if (inputPair.getLeft() != null) {
+        FilterContext currentFilterExpression = inputPair.getRight();
+        if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
+          
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getLeft());
+          continue;
+        }
+        Pair<FilterPlanNode, BaseFilterOperator> pair =
+            buildFilterOperator(currentFilterExpression);
+        BaseFilterOperator wrappedFilterOperator = new 
CombinedFilterOperator(mainPredicateFilterOperator,
+            pair.getRight());
+        TransformOperator newTransformOperator = 
buildTransformOperatorForFilteredAggregates(wrappedFilterOperator);
+        // For each transform operator, associate it with the underlying 
expression. This allows
+        // fetching the relevant TransformOperator when resolving blocks 
during aggregation
+        // execution
+        List<AggregationFunction> aggFunctionList = new ArrayList<>();
+        aggFunctionList.add(inputPair.getLeft());
+        filterContextToAggFuncsMap.put(currentFilterExpression,
+            Pair.of(aggFunctionList, newTransformOperator));
+      } else {
+        nonFilteredAggregationFunctions.add(inputPair.getLeft());
+      }
+    }
+    List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
+        new ArrayList<>();
+    // Convert to array since FilteredAggregationOperator expects it
+    for (Pair<List<AggregationFunction>, TransformOperator> pair
+        : filterContextToAggFuncsMap.values()) {
+      List<AggregationFunction> aggregationFunctionList = pair.getLeft();
+      if (aggregationFunctionList == null) {
+        throw new IllegalStateException("Null aggregation list seen");
+      }
+      aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new 
AggregationFunction[0]),
+          pair.getRight()));
+    }
+    
aggToTransformOpList.add(Pair.of(nonFilteredAggregationFunctions.toArray(new 
AggregationFunction[0]),
+        mainTransformOperator));
+
+    return new 
FilteredAggregationOperator(_queryContext.getAggregationFunctions(), 
aggToTransformOpList,
+        numTotalDocs);
+  }
+
+  /**
+   * Build a filter operator from the given FilterContext.
+   *
+   * It returns the FilterPlanNode to allow reusing plan level components such 
as predicate
+   * evaluator map
+   */
+  private Pair<FilterPlanNode, BaseFilterOperator> 
buildFilterOperator(FilterContext filterContext) {
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext, filterContext);
+
+    return Pair.of(filterPlanNode, filterPlanNode.run());
+  }
+
+  /**
+   * Build transform and aggregation operators for the given bottom level plan.
+   * Note that this method operates only for non filtered aggregates.
+   * @param filterOperator Filter operator to be used in the corresponding 
chain
+   * @param filterPlanNode Plan node associated with the filter operator
+   * @return Pair, consisting of the built TransformOperator and Aggregation 
operator for chain
+   */
+  private BaseOperator<IntermediateResultsBlock> 
buildOperatorsForNonFilteredAggs(BaseFilterOperator filterOperator,

Review comment:
       Shall we revert the changes within this method? The changes in this 
method should be irrelevant to the FILTER feature

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -62,31 +68,164 @@ public AggregationPlanNode(IndexSegment indexSegment, 
QueryContext queryContext)
   public Operator<IntermediateResultsBlock> run() {
     assert _queryContext.getAggregationFunctions() != null;
 
+    boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+    if (hasFilteredPredicates) {
+      return buildFilteredAggOperator();
+    }
+
+    return buildNonFilteredAggOperator();
+  }
+
+  /**
+   * Returns {@code true} if the given aggregations can be solved with segment 
metadata, {@code false} otherwise.
+   * <p>Aggregations supported: COUNT
+   */
+  private static boolean isFitForMetadataBasedPlan(AggregationFunction[] 
aggregationFunctions) {
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      if (aggregationFunction.getType() != AggregationFunctionType.COUNT) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns {@code true} if the given aggregations can be solved with 
dictionary, {@code false} otherwise.
+   * <p>Aggregations supported: MIN, MAX, MIN_MAX_RANGE, DISTINCT_COUNT, 
SEGMENT_PARTITIONED_DISTINCT_COUNT
+   */
+  private static boolean isFitForDictionaryBasedPlan(AggregationFunction[] 
aggregationFunctions,
+      IndexSegment indexSegment) {
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      AggregationFunctionType functionType = aggregationFunction.getType();
+      if (functionType != AggregationFunctionType.MIN && functionType != 
AggregationFunctionType.MAX
+          && functionType != AggregationFunctionType.MINMAXRANGE
+          && functionType != AggregationFunctionType.DISTINCTCOUNT
+          && functionType != 
AggregationFunctionType.SEGMENTPARTITIONEDDISTINCTCOUNT) {
+        return false;
+      }
+      ExpressionContext argument = (ExpressionContext) 
aggregationFunction.getInputExpressions().get(0);
+      if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
+        return false;
+      }
+      String column = argument.getIdentifier();
+      Dictionary dictionary = 
indexSegment.getDataSource(column).getDictionary();
+      if (dictionary == null) {
+        return false;
+      }
+    }
+    return true;
+  }

Review comment:
       Can we move these 2 methods back? They are not changed, but make code 
review and future tracking harder

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -441,76 +460,105 @@ public QueryContext build() {
      */
     private void generateAggregationFunctions(QueryContext queryContext) {
       List<AggregationFunction> aggregationFunctions = new ArrayList<>();
-      List<Pair<AggregationFunction, FilterContext>> 
filteredAggregationFunctions = new ArrayList<>();
+      List<Pair<AggregationFunction, FilterContext>> filteredAggregations = 
new ArrayList<>();
       Map<FunctionContext, Integer> aggregationFunctionIndexMap = new 
HashMap<>();
+      Map<Pair<FunctionContext, FilterContext>, Integer> 
filterExpressionIndexMap = new HashMap<>();
 
       // Add aggregation functions in the SELECT clause
       // NOTE: DO NOT deduplicate the aggregation functions in the SELECT 
clause because that involves protocol change.
-      List<FunctionContext> aggregationsInSelect = new ArrayList<>();
-      List<Pair<FunctionContext, FilterContext>> filteredAggregations = new 
ArrayList<>();
+      List<Pair<FilterContext, FunctionContext>> aggregationsInSelect = new 
ArrayList<>();
       for (ExpressionContext selectExpression : 
queryContext._selectExpressions) {
-        getAggregations(selectExpression, aggregationsInSelect, 
filteredAggregations);
+        getAggregations(selectExpression, aggregationsInSelect);
       }
-      for (FunctionContext function : aggregationsInSelect) {
-        int functionIndex = aggregationFunctions.size();
+      for (Pair<FilterContext, FunctionContext> pair : aggregationsInSelect) {
+        FunctionContext function = pair.getRight();
+        int functionIndex = filteredAggregations.size();
         AggregationFunction aggregationFunction =
             AggregationFunctionFactory.getAggregationFunction(function, 
queryContext);
-        aggregationFunctions.add(aggregationFunction);
+
+        FilterContext filterContext = null;
+        // If the left pair is not null, implies a filtered aggregation
+        if (pair.getLeft() != null) {
+          if (_groupByExpressions != null) {
+            throw new IllegalStateException("GROUP BY with FILTER clauses is 
not supported");
+          }
+
+          queryContext._hasFilteredAggregations = true;
+
+          filterContext = pair.getLeft();
+
+          Pair<FunctionContext, FilterContext> filterContextPair =
+              Pair.of(function, filterContext);
+
+          if (!filterExpressionIndexMap.containsKey(filterContextPair)) {
+            int filterMapIndex = filterExpressionIndexMap.size();
+
+            filterExpressionIndexMap.put(filterContextPair, filterMapIndex);
+          }
+        }
+        filteredAggregations.add(Pair.of(aggregationFunction, filterContext));
+

Review comment:
       Shall we reduce some empty lines in this part of the code?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -154,4 +129,168 @@ private static boolean 
isFitForDictionaryBasedPlan(AggregationFunction[] aggrega
     }
     return true;
   }
+
+  /**
+   * Build a FilteredAggregationOperator given the parameters.
+   * @param mainPredicateFilterOperator Filter operator corresponding to the 
main predicate
+   * @param mainTransformOperator Transform operator corresponding to the main 
predicate
+   * @param aggregationFunctions Aggregation functions in the query
+   * @param numTotalDocs Number of total docs
+   */
+  private BaseOperator<IntermediateResultsBlock> 
buildOperatorForFilteredAggregations(
+      BaseFilterOperator mainPredicateFilterOperator,

Review comment:
       (code format) Please apply the pinot code format and use it to 
auto-reformat this file. Several changes do not comply with the format




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to