walterddr commented on code in PR #11607: URL: https://github.com/apache/pinot/pull/11607#discussion_r1329089094
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -1065,4 +1066,14 @@ public enum JoinOverFlowMode { THROW, BREAK } } + + public static class NullValuePlaceHolder { Review Comment: we already have default null value in FieldSpecs do we plan to do differently here? should we consolidate? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -117,31 +96,34 @@ public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inp _aggCallSignatureMap = Collections.emptyMap(); } - _hasReturnedAggregateBlock = false; + // Initialize the aggregation functions _colNameToIndexMap = new HashMap<>(); - - // Convert groupSet to ExpressionContext that our aggregation functions understand. - List<ExpressionContext> groupByExpr = getGroupSet(groupSet); - List<FunctionContext> functionContexts = getFunctionContexts(aggCalls); - AggregationFunction[] aggFunctions = new AggregationFunction[functionContexts.size()]; - - for (int i = 0; i < functionContexts.size(); i++) { + int numFunctions = functionContexts.size(); + AggregationFunction<?, ?>[] aggFunctions = new AggregationFunction[numFunctions]; + for (int i = 0; i < numFunctions; i++) { aggFunctions[i] = AggregationFunctionFactory.getAggregationFunction(functionContexts.get(i), true); } + // Process the filter argument indices + int[] filterArgIds = new int[numFunctions]; + int maxFilterArgId = -1; Review Comment: why did we need this extra integer? isnt the filterArgIds array null/empty indicate theres no filter? ########## pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java: ########## @@ -79,7 +79,7 @@ public void shouldHandleUpstreamErrorBlocks() { DataSchema outSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE}); AggregateOperator operator = new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, inSchema, calls, group, - AggType.INTERMEDIATE, null, null); + AggType.DIRECT, Collections.singletonList(-1), null); Review Comment: any specific reason we change the AggType here? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -325,57 +291,126 @@ private ExpressionContext convertRexExpressionToExpressionContext(int aggIdx, in return exprContext; } - // TODO: If the previous block is not mailbox received, this method is not efficient. Then getDataBlock() will - // convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column - // value primitive type. - static Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block, - DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int filterArgIdx) { + private int[] getGroupKeyIds(List<RexExpression> groupSet) { + int numKeys = groupSet.size(); + int[] groupKeyIds = new int[numKeys]; + for (int i = 0; i < numKeys; i++) { + RexExpression rexExp = groupSet.get(i); + Preconditions.checkState(rexExp.getKind() == SqlKind.INPUT_REF, "Group key must be an input reference, got: %s", + rexExp.getKind()); + groupKeyIds[i] = ((RexExpression.InputRef) rexExp).getIndex(); + } + return groupKeyIds; + } + + static RoaringBitmap getMatchedBitmap(TransferableBlock block, int filterArgId) { + Preconditions.checkArgument(filterArgId >= 0, "Got negative filter argument id: %s", filterArgId); + RoaringBitmap matchedBitmap = new RoaringBitmap(); + if (block.isContainerConstructed()) { + List<Object[]> rows = block.getContainer(); + int numRows = rows.size(); + for (int rowId = 0; rowId < numRows; rowId++) { + if ((int) rows.get(rowId)[filterArgId] == 1) { + matchedBitmap.add(rowId); + } + } + } else { + DataBlock dataBlock = block.getDataBlock(); + int numRows = dataBlock.getNumberOfRows(); + for (int rowId = 0; rowId < numRows; rowId++) { + if (dataBlock.getInt(rowId, filterArgId) == 1) { + matchedBitmap.add(rowId); + } + } + } + return matchedBitmap; + } + + static Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction<?, ?> aggFunction, + TransferableBlock block, Map<String, Integer> colNameToIndexMap) { List<ExpressionContext> expressions = aggFunction.getInputExpressions(); int numExpressions = expressions.size(); if (numExpressions == 0) { return Collections.emptyMap(); } - Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>(); - for (ExpressionContext expression : expressions) { - if (expression.getType().equals(ExpressionContext.Type.IDENTIFIER) && !"__PLACEHOLDER__".equals( Review Comment: i was originally intended to get rid of the `__PLACEHOLDER__`. let's factor this into a util or leave a TODO so that it's easier to replace in the future (i see 4 __PLACEHOLDER__ usage in the new impl) ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -325,57 +291,126 @@ private ExpressionContext convertRexExpressionToExpressionContext(int aggIdx, in return exprContext; } - // TODO: If the previous block is not mailbox received, this method is not efficient. Then getDataBlock() will - // convert the unserialized format to serialized format of BaseDataBlock. Then it will convert it back to column - // value primitive type. - static Map<ExpressionContext, BlockValSet> getBlockValSetMap(AggregationFunction aggFunction, TransferableBlock block, - DataSchema inputDataSchema, Map<String, Integer> colNameToIndexMap, int filterArgIdx) { + private int[] getGroupKeyIds(List<RexExpression> groupSet) { Review Comment: nit: why reordering functions? this is original getGroupSet right? (and if not cant we remove the `convertRexExpressionToExpressionContext` func)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org