amrishlal commented on a change in pull request #7916:
URL: https://github.com/apache/pinot/pull/7916#discussion_r785075210



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -154,4 +138,151 @@ private static boolean 
isFitForDictionaryBasedPlan(AggregationFunction[] aggrega
     }
     return true;
   }
+
+  /**
+   * Build a CombinedTransformOperator given the main predicate filter 
operator and the corresponding
+   * aggregation functions.
+   * @param nonFilteredTransformOperator Transform operator corresponding to 
the main predicate
+   * @param mainPredicateFilterOperator Filter operator corresponding to the 
main predicate
+   * @param expressionsToTransform Expressions to transform
+   * @param aggregationFunctions Aggregation functions in the query
+   */
+  private TransformOperator 
buildOperatorForFilteredAggregations(TransformOperator 
nonFilteredTransformOperator,
+      BaseFilterOperator mainPredicateFilterOperator,
+      Set<ExpressionContext> expressionsToTransform,
+      AggregationFunction[] aggregationFunctions) {
+    Map<ExpressionContext, TransformOperator> transformOperatorMap = new 
HashMap<>();
+    List<Pair<ExpressionContext, BaseFilterOperator>> baseFilterOperatorList =
+        new ArrayList<>();
+    List<Pair<ExpressionContext, Pair<FilterPlanNode, BaseFilterOperator>>> 
filterPredicatesAndMetadata =
+        new ArrayList<>();
+
+    // For each aggregation function, check if the aggregation function is a 
filtered agg.
+    // If it is, populate the corresponding filter operator and metadata
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      if (aggregationFunction instanceof FilterableAggregationFunction) {
+        FilterableAggregationFunction filterableAggregationFunction =
+            (FilterableAggregationFunction) aggregationFunction;
+        Pair<FilterPlanNode, BaseFilterOperator> pair =
+            
buildFilterOperator(filterableAggregationFunction.getFilterContext());
+
+        
baseFilterOperatorList.add(Pair.of(filterableAggregationFunction.getAssociatedExpressionContext(),
+            pair.getRight()));
+        
filterPredicatesAndMetadata.add(Pair.of(filterableAggregationFunction.getAssociatedExpressionContext(),
+            pair));
+      }
+    }
+
+    CombinedFilterOperator combinedFilterOperator = new 
CombinedFilterOperator(baseFilterOperatorList,
+        mainPredicateFilterOperator);
+
+    // For each transform operator, associate it with the underlying 
expression. This allows
+    // fetching the relevant TransformOperator when resolving blocks during 
aggregation
+    // execution
+    for (Pair<ExpressionContext, Pair<FilterPlanNode, BaseFilterOperator>> pair
+        : filterPredicatesAndMetadata) {
+      Pair<TransformOperator,
+          BaseOperator<IntermediateResultsBlock>> innerPair =
+          buildOperators(combinedFilterOperator, pair.getRight().getLeft(),
+              true, pair.getLeft());
+
+      transformOperatorMap.put(pair.getLeft(), innerPair.getLeft());
+    }
+
+    // Add the main predicate filter operator to the map
+    transformOperatorMap.put(_queryContext.getFilterExpression(), 
nonFilteredTransformOperator);
+
+    return new CombinedTransformOperator(transformOperatorMap, 
_queryContext.getFilterExpression(),
+        expressionsToTransform);
+  }
+
+  /**
+   * Build a filter operator from the given FilterContext.
+   *
+   * It returns the FilterPlanNode to allow reusing plan level components such 
as predicate
+   * evaluator map
+   */
+  private Pair<FilterPlanNode, BaseFilterOperator> 
buildFilterOperator(FilterContext filterContext) {
+    FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext, filterContext);
+
+    return Pair.of(filterPlanNode, filterPlanNode.run());
+  }
+
+  /**
+   * Build transform and aggregation operators for the given bottom level plan
+   * @param filterOperator Filter operator to be used in the corresponding 
chain
+   * @param filterPlanNode Plan node associated with the filter operator
+   * @param isSwimlane Is this plan a swim lane?

Review comment:
       This doesn't appear to be resolved. I understand that you have a design 
doc, but it will be useful to describe "SwimLane" in code as well to make the 
code more readable since references to design doc tend to get lost over time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to