atris commented on a change in pull request #7916:
URL: https://github.com/apache/pinot/pull/7916#discussion_r794293449



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -154,4 +118,181 @@ private static boolean 
isFitForDictionaryBasedPlan(AggregationFunction[] aggrega
     }
     return true;
   }
+
+  /**
+   * Build a FilteredAggregationOperator given the parameters.
+   * @param mainPredicateFilterOperator Filter operator corresponding to the 
main predicate
+   * @param mainTransformOperator Transform operator corresponding to the main 
predicate
+   * @param numTotalDocs Number of total docs
+   */
+  private BaseOperator<IntermediateResultsBlock> 
buildOperatorForFilteredAggregations(
+      BaseFilterOperator mainPredicateFilterOperator,
+      TransformOperator mainTransformOperator, int numTotalDocs) {
+    Map<FilterContext, Pair<List<AggregationFunction>, TransformOperator>> 
filterContextToAggFuncsMap =
+        new HashMap<>();
+    List<AggregationFunction> nonFilteredAggregationFunctions = new 
ArrayList<>();
+    List<Pair<FilterContext, AggregationFunction>> aggregationFunctions = 
_queryContext
+        .getAggregationsWithFilters();
+
+    // For each aggregation function, check if the aggregation function is a 
filtered agg.
+    // If it is, populate the corresponding filter operator and corresponding 
transform operator
+    for (Pair<FilterContext, AggregationFunction> inputPair : 
aggregationFunctions) {
+      if (inputPair.getLeft() != null) {
+        FilterContext currentFilterExpression = inputPair.getLeft();
+        if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
+          
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getRight());
+          continue;
+        }
+        Pair<FilterPlanNode, BaseFilterOperator> pair =
+            buildFilterOperator(currentFilterExpression);
+        BaseFilterOperator wrappedFilterOperator = new 
CombinedFilterOperator(mainPredicateFilterOperator,
+            pair.getRight());
+        Pair<TransformOperator,
+            BaseOperator<IntermediateResultsBlock>> innerPair =
+            buildOperators(wrappedFilterOperator, pair.getLeft());
+        // For each transform operator, associate it with the underlying 
expression. This allows
+        // fetching the relevant TransformOperator when resolving blocks 
during aggregation
+        // execution
+        List aggFunctionList = new ArrayList<>();
+        aggFunctionList.add(inputPair.getRight());
+        filterContextToAggFuncsMap.put(currentFilterExpression,
+            Pair.of(aggFunctionList, innerPair.getLeft()));
+      } else {
+        nonFilteredAggregationFunctions.add(inputPair.getRight());
+      }
+    }
+    List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
+        new ArrayList<>();
+    // Convert to array since FilteredAggregationOperator expects it
+    for (Pair<List<AggregationFunction>, TransformOperator> pair
+        : filterContextToAggFuncsMap.values()) {
+      List<AggregationFunction> aggregationFunctionList = pair.getLeft();
+      if (aggregationFunctionList == null) {
+        throw new IllegalStateException("Null aggregation list seen");
+      }
+      aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new 
AggregationFunction[0]),
+          pair.getRight()));
+    }
+    
aggToTransformOpList.add(Pair.of(nonFilteredAggregationFunctions.toArray(new 
AggregationFunction[0]),
+        mainTransformOperator));
+
+    return new 
FilteredAggregationOperator(_queryContext.getAggregationFunctions(), 
aggToTransformOpList,
+        numTotalDocs);
+  }
+
+  /**
+   * Build a filter operator from the given FilterContext.
+   *
+   * It returns the FilterPlanNode to allow reusing plan level components such 
as predicate
+   * evaluator map
+   */
+  private Pair<FilterPlanNode, BaseFilterOperator> 
buildFilterOperator(FilterContext filterContext) {
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext, filterContext);
+
+    return Pair.of(filterPlanNode, filterPlanNode.run());
+  }
+
+  /**
+   * Build transform and aggregation operators for the given bottom level plan
+   * @param filterOperator Filter operator to be used in the corresponding 
chain
+   * @param filterPlanNode Plan node associated with the filter operator
+   * @return Pair, consisting of the built TransformOperator and Aggregation 
operator for chain
+   */
+  private Pair<TransformOperator,
+      BaseOperator<IntermediateResultsBlock>> 
buildOperators(BaseFilterOperator filterOperator,
+      FilterPlanNode filterPlanNode) {
+    assert _queryContext.getAggregationFunctions() != null;
+
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    AggregationFunction[] aggregationFunctions = 
_queryContext.getAggregationFunctions();
+
+    Set<ExpressionContext> expressionsToTransform =
+        
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions, 
null);
+    boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+
+    List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
+
+    // Use metadata/dictionary to solve the query if possible
+    // TODO: Use the same operator for both of them so that COUNT(*), MAX(col) 
can be optimized
+    if (filterOperator.isResultMatchingAll() && !hasFilteredPredicates) {
+      if (isFitForMetadataBasedPlan(aggregationFunctions)) {
+        return Pair.of(null, new 
MetadataBasedAggregationOperator(aggregationFunctions,
+            _indexSegment.getSegmentMetadata(), Collections.emptyMap()));
+      } else if (isFitForDictionaryBasedPlan(aggregationFunctions, 
_indexSegment)) {
+        Map<String, Dictionary> dictionaryMap = new HashMap<>();
+        for (AggregationFunction aggregationFunction : aggregationFunctions) {
+          String column = ((ExpressionContext) 
aggregationFunction.getInputExpressions().get(0)).getIdentifier();
+          dictionaryMap.computeIfAbsent(column, k -> 
_indexSegment.getDataSource(k).getDictionary());
+        }
+        return Pair.of(null,
+            new DictionaryBasedAggregationOperator(aggregationFunctions, 
dictionaryMap,
+            numTotalDocs));
+      }
+    }
+
+    if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext)) 
{
+      // Use star-tree to solve the query if possible
+
+      AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+          StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
+      if (aggregationFunctionColumnPairs != null) {
+        Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
+            StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, 
_queryContext.getFilter(),
+                filterPlanNode.getPredicateEvaluatorMap());
+        if (predicateEvaluatorsMap != null) {
+          for (StarTreeV2 starTreeV2 : starTrees) {
+            if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), 
aggregationFunctionColumnPairs, null,
+                predicateEvaluatorsMap.keySet())) {
+
+              TransformOperator transformOperator = new 
StarTreeTransformPlanNode(starTreeV2,
+                  aggregationFunctionColumnPairs, null,
+                      predicateEvaluatorsMap, 
_queryContext.getDebugOptions()).run();
+              AggregationOperator aggregationOperator = new 
AggregationOperator(aggregationFunctions,
+                  transformOperator, numTotalDocs, true);
+
+              return Pair.of(transformOperator, aggregationOperator);
+            }
+          }
+        }
+      }
+    }
+
+    TransformOperator transformOperator = new TransformPlanNode(_indexSegment, 
_queryContext,
+          expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL, 
filterOperator).run();
+    AggregationOperator aggregationOperator = new 
AggregationOperator(aggregationFunctions,
+        transformOperator, numTotalDocs, false);
+
+    return Pair.of(transformOperator, aggregationOperator);
+  }
+
+  /**
+   * Builds the operator to be used for non filtered aggregations
+   */
+  private BaseOperator<IntermediateResultsBlock> buildNonFilteredAggOperator() 
{

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to