walterddr commented on code in PR #11607:
URL: https://github.com/apache/pinot/pull/11607#discussion_r1329089094


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1065,4 +1066,14 @@ public enum JoinOverFlowMode {
       THROW, BREAK
     }
   }
+
+  public static class NullValuePlaceHolder {

Review Comment:
   we already have default null value in FieldSpecs do we plan to do 
differently here? should we consolidate? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -117,31 +96,34 @@ public AggregateOperator(OpChainExecutionContext context, 
MultiStageOperator inp
       _aggCallSignatureMap = Collections.emptyMap();
     }
 
-    _hasReturnedAggregateBlock = false;
+    // Initialize the aggregation functions
     _colNameToIndexMap = new HashMap<>();
-
-    // Convert groupSet to ExpressionContext that our aggregation functions 
understand.
-    List<ExpressionContext> groupByExpr = getGroupSet(groupSet);
-
     List<FunctionContext> functionContexts = getFunctionContexts(aggCalls);
-    AggregationFunction[] aggFunctions = new 
AggregationFunction[functionContexts.size()];
-
-    for (int i = 0; i < functionContexts.size(); i++) {
+    int numFunctions = functionContexts.size();
+    AggregationFunction<?, ?>[] aggFunctions = new 
AggregationFunction[numFunctions];
+    for (int i = 0; i < numFunctions; i++) {
       aggFunctions[i] = 
AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), 
true);
     }
 
+    // Process the filter argument indices
+    int[] filterArgIds = new int[numFunctions];
+    int maxFilterArgId = -1;

Review Comment:
   why did we need this extra integer? isnt the filterArgIds array null/empty 
indicate theres no filter?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -79,7 +79,7 @@ public void shouldHandleUpstreamErrorBlocks() {
     DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new 
ColumnDataType[]{INT, DOUBLE});
     AggregateOperator operator =
         new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, 
outSchema, inSchema, calls, group,
-            AggType.INTERMEDIATE, null, null);
+            AggType.DIRECT, Collections.singletonList(-1), null);

Review Comment:
   any specific reason we change the AggType here? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -325,57 +291,126 @@ private ExpressionContext 
convertRexExpressionToExpressionContext(int aggIdx, in
     return exprContext;
   }
 
-  // TODO: If the previous block is not mailbox received, this method is not 
efficient.  Then getDataBlock() will
-  //  convert the unserialized format to serialized format of BaseDataBlock. 
Then it will convert it back to column
-  //  value primitive type.
-  static Map<ExpressionContext, BlockValSet> 
getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block,
-      DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int 
filterArgIdx) {
+  private int[] getGroupKeyIds(List<RexExpression> groupSet) {
+    int numKeys = groupSet.size();
+    int[] groupKeyIds = new int[numKeys];
+    for (int i = 0; i < numKeys; i++) {
+      RexExpression rexExp = groupSet.get(i);
+      Preconditions.checkState(rexExp.getKind() == SqlKind.INPUT_REF, "Group 
key must be an input reference, got: %s",
+          rexExp.getKind());
+      groupKeyIds[i] = ((RexExpression.InputRef) rexExp).getIndex();
+    }
+    return groupKeyIds;
+  }
+
+  static RoaringBitmap getMatchedBitmap(TransferableBlock block, int 
filterArgId) {
+    Preconditions.checkArgument(filterArgId >= 0, "Got negative filter 
argument id: %s", filterArgId);
+    RoaringBitmap matchedBitmap = new RoaringBitmap();
+    if (block.isContainerConstructed()) {
+      List<Object[]> rows = block.getContainer();
+      int numRows = rows.size();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        if ((int) rows.get(rowId)[filterArgId] == 1) {
+          matchedBitmap.add(rowId);
+        }
+      }
+    } else {
+      DataBlock dataBlock = block.getDataBlock();
+      int numRows = dataBlock.getNumberOfRows();
+      for (int rowId = 0; rowId < numRows; rowId++) {
+        if (dataBlock.getInt(rowId, filterArgId) == 1) {
+          matchedBitmap.add(rowId);
+        }
+      }
+    }
+    return matchedBitmap;
+  }
+
+  static Map<ExpressionContext, BlockValSet> 
getBlockValSetMap(AggregationFunction<?, ?> aggFunction,
+      TransferableBlock block, Map<String, Integer> colNameToIndexMap) {
     List<ExpressionContext> expressions = aggFunction.getInputExpressions();
     int numExpressions = expressions.size();
     if (numExpressions == 0) {
       return Collections.emptyMap();
     }
-
     Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>();
-    for (ExpressionContext expression : expressions) {
-      if (expression.getType().equals(ExpressionContext.Type.IDENTIFIER) && 
!"__PLACEHOLDER__".equals(

Review Comment:
   i was originally intended to get rid of the `__PLACEHOLDER__`. let's factor 
this into a util or leave a TODO so that it's easier to replace in the future 
(i see 4 __PLACEHOLDER__ usage in the new impl)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -325,57 +291,126 @@ private ExpressionContext 
convertRexExpressionToExpressionContext(int aggIdx, in
     return exprContext;
   }
 
-  // TODO: If the previous block is not mailbox received, this method is not 
efficient.  Then getDataBlock() will
-  //  convert the unserialized format to serialized format of BaseDataBlock. 
Then it will convert it back to column
-  //  value primitive type.
-  static Map<ExpressionContext, BlockValSet> 
getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block,
-      DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int 
filterArgIdx) {
+  private int[] getGroupKeyIds(List<RexExpression> groupSet) {

Review Comment:
   nit: why reordering functions? this is original getGroupSet right? (and if 
not cant we remove the `convertRexExpressionToExpressionContext` func)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to