Jackie-Jiang commented on a change in pull request #6991: URL: https://github.com/apache/incubator-pinot/pull/6991#discussion_r641733538
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java ########## @@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, int length, int function public AggregationGroupByResult getResult() { return new AggregationGroupByResult(_groupKeyGenerator, _aggregationFunctions, _groupByResultHolders); } + + @Override + public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean enableSegmentGroupTrim, + int threshold, TableResizer tableResizer) { + // Check if a trim is necessary + int keyNum = 0; + if (_hasMVGroupByExpression) { + keyNum = _mvGroupKeys.length; + } else { + keyNum = _svGroupKeys.length; + } Review comment: This part is incorrect. `_svGroupKeys` and `_mvGroupKeys` are just buffers and the length is always 10000 You might want to add a API in `GroupKeyGenerator` to return the number of keys ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -249,6 +327,44 @@ private IntermediateRecord getIntermediateRecord(Key key, Record record) { Comparable extract(Record record); } + /** + * Helper class to store a subset of Record fields + * IntermediateRecord is derived from a Record + * Some of the main properties of an IntermediateRecord are: + * + * 1. Key in IntermediateRecord is expected to be identical to the one in the Record + * 2. For values, IntermediateRecord should only have the columns needed for order by + * 3. Inside the values, the columns should be ordered by the order by sequence + * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * 5. There is an optional field to store the original record. Each segment can keep the intermediate result in this + * form to prevent constructing IntermediateRecord again in the server. + */ + public static class IntermediateRecord { Review comment: Make this a separate class since it is shared by multiple classes ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java ########## @@ -57,6 +58,7 @@ private AggregationGroupByResult _aggregationGroupByResult; private List<Map<String, Object>> _combinedAggregationGroupByResult; private List<ProcessingException> _processingExceptions; + private Collection<TableResizer.IntermediateRecord> _intermediateCollection; Review comment: ```suggestion private Collection<TableResizer.IntermediateRecord> _intermediateRecords; ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java ########## @@ -43,23 +46,29 @@ private final ExpressionContext[] _groupByExpressions; private final int _maxInitialResultHolderCapacity; private final int _numGroupsLimit; + private final int _inSegmentResultLimit; Review comment: `_inSegmentResultLimit` and `_enableSegmentGroupTrim` can be combined into `_trimSize`, and use negative value to denote that trim is off ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -49,6 +53,7 @@ private final Map<ExpressionContext, Integer> _groupByExpressionIndexMap; private final AggregationFunction[] _aggregationFunctions; private final Map<FunctionContext, Integer> _aggregationFunctionIndexMap; + private final boolean _hasOrderBy; Review comment: We should only use `TableResizer` when the query has order-by. Please check it on the caller side before creating the `TableResizer` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -214,23 +230,85 @@ private IntermediateRecord getIntermediateRecord(Key key, Record record) { } /** - * Helper class to store a subset of Record fields - * IntermediateRecord is derived from a Record - * Some of the main properties of an IntermediateRecord are: - * - * 1. Key in IntermediateRecord is expected to be identical to the one in the Record - * 2. For values, IntermediateRecord should only have the columns needed for order by - * 3. Inside the values, the columns should be ordered by the order by sequence - * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * Helper class to make an IntermediateRecord with the record. */ - private static class IntermediateRecord { - final Key _key; - final Comparable[] _values; + private IntermediateRecord getInSegmentIntermediateRecord(Key key, Record record) { Review comment: Remove this method and directly modify `getIntermediateRecord()`. The cached record can be used to simplify line 184: `recordsMap.get(recordToRetain._key)` -> `recordToRetain._record` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -214,23 +230,85 @@ private IntermediateRecord getIntermediateRecord(Key key, Record record) { } /** - * Helper class to store a subset of Record fields - * IntermediateRecord is derived from a Record - * Some of the main properties of an IntermediateRecord are: - * - * 1. Key in IntermediateRecord is expected to be identical to the one in the Record - * 2. For values, IntermediateRecord should only have the columns needed for order by - * 3. Inside the values, the columns should be ordered by the order by sequence - * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * Helper class to make an IntermediateRecord with the record. */ - private static class IntermediateRecord { - final Key _key; - final Comparable[] _values; + private IntermediateRecord getInSegmentIntermediateRecord(Key key, Record record) { + Comparable[] intermediateRecordValues = new Comparable[_numOrderByExpressions]; + for (int i = 0; i < _numOrderByExpressions; i++) { + intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record); + } + return new IntermediateRecord(key, intermediateRecordValues, record); + } - IntermediateRecord(Key key, Comparable[] values) { - _key = key; - _values = values; + /** + * Trim the aggregation results using a priority queue and returns a the priority queue. + * This method is to be called from individual segment if the intermediate results need to be trimmed. + * The use case now is Multi-Segment GroupBy OrderBy query. + */ + public PriorityQueue<IntermediateRecord> trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator, + GroupByResultHolder[] _groupByResultHolders, int size) { + if (!groupKeyIterator.hasNext() || _groupByResultHolders.length == 0 || size == 0) { + return new PriorityQueue<>(); + } + int numAggregationFunctions = _aggregationFunctions.length; + int numColumns = numAggregationFunctions + _numGroupByExpressions; + + // Get comparator + Comparator<IntermediateRecord> comparator = _intermediateRecordComparator.reversed(); + PriorityQueue<IntermediateRecord> priorityQueue = new PriorityQueue<>(size, comparator); + while (groupKeyIterator.hasNext()) { + // Iterate over keys + GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); + Object[] keys = groupKey._keys; + Object[] values = Arrays.copyOf(keys, numColumns); + int groupId = groupKey._groupId; + for (int i = 0; i < numAggregationFunctions; i++) { + values[_numGroupByExpressions + i] = + _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i], groupId); + } + // {key, intermediate_record, record} + IntermediateRecord intermediateRecord = getInSegmentIntermediateRecord(new Key(keys), new Record(values)); + if (priorityQueue.size() < size) { + priorityQueue.offer(intermediateRecord); + } else { + IntermediateRecord peek = priorityQueue.peek(); + if (comparator.compare(peek, intermediateRecord) < 0) { + priorityQueue.poll(); + priorityQueue.offer(intermediateRecord); + } + } } + return priorityQueue; + } + + /** + * Build a list of intermediate record and return the list. + * This method is to be called from individual segment if the intermediate results doesn't need to be trimmed. + * The use case now is Multi-Segment GroupBy OrderBy query. + */ + public List<IntermediateRecord> buildInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator, Review comment: When trim is off, we should avoid creating the list because it could lead to memory issue ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java ########## @@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, int length, int function public AggregationGroupByResult getResult() { return new AggregationGroupByResult(_groupKeyGenerator, _aggregationFunctions, _groupByResultHolders); } + + @Override + public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean enableSegmentGroupTrim, + int threshold, TableResizer tableResizer) { Review comment: ```suggestion int trimSize, TableResizer tableResizer) { ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java ########## @@ -117,6 +119,19 @@ public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions, _dataSchema = dataSchema; } + /** + * Constructor for aggregation group-by order-by result with {@link AggregationGroupByResult} and + * with a collection of intermediate records. + */ + public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions, + @Nullable AggregationGroupByResult aggregationGroupByResults, Review comment: Do we need to pass `aggregationGroupByResults` here? Also the intermediate records should never be null ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java ########## @@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, int length, int function public AggregationGroupByResult getResult() { return new AggregationGroupByResult(_groupKeyGenerator, _aggregationFunctions, _groupByResultHolders); } + + @Override + public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean enableSegmentGroupTrim, Review comment: Remove `enableSegmentGroupTrim`. When this method is invoked, it should always trim the result ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -51,40 +51,56 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity; + /** * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; - + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); private final int _maxInitialResultHolderCapacity; // Limit on number of groups stored for each segment, beyond which no new group will be created private final int _numGroupsLimit; // Used for SQL GROUP BY (server combine) private final int _groupByTrimThreshold; + private final boolean _enableSegmentGroupTrim; + + @VisibleForTesting + private int _inSegmentTrimLimit = -1; Review comment: Since we need it for testing purpose, let's directly make it configurable as `_minSegmentTrimSize` and default to 5000 ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java ########## @@ -281,6 +296,16 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) { _numGroupsLimitReached = numGroupsLimitReached; } + /** + * Get an iterator for the intermediate record collection. Should only be called if _intermediateCollection is present + */ + public Iterator<TableResizer.IntermediateRecord> getIntermediateResultIterator() { Review comment: Directly return the collection instead of the iterator, and annotate it with nullable ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java ########## @@ -106,8 +116,13 @@ protected IntermediateResultsBlock getNextBlock() { groupByExecutor.process(transformBlock); } + if (!_tableResizer.getOrderByStatus()) { + return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + } + Collection<TableResizer.IntermediateRecord> intermediate = + groupByExecutor.trimGroupByResult(_enableSegmentGroupTrim, _inSegmentResultLimit, _tableResizer); // Build intermediate result block based on aggregation group-by result from the executor - return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), intermediate, _dataSchema); Review comment: Fall back to the old behavior when there is no segment trim. Keeping a list can increase the memory overhead ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java ########## @@ -85,6 +94,7 @@ public AggregationGroupByOrderByOperator(AggregationFunction[] aggregationFuncti } _dataSchema = new DataSchema(columnNames, columnDataTypes); + _tableResizer = new TableResizer(_dataSchema, queryContext); Review comment: Don't create table resizer here, create it in `getNextBlock()` only when needed ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -137,8 +206,15 @@ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext quer QueryOptions queryOptions = new QueryOptions(queryContext.getQueryOptions()); // new Combine operator only when GROUP_BY_MODE explicitly set to SQL if (queryOptions.isGroupByModeSQL()) { + // Calculate trim limit = max(limit * 5, 5000) Review comment: Don't calculate it here, move it into the `AggregationGroupByOrderByPlanNode` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -105,8 +121,61 @@ public InstancePlanMakerImplV2(QueryExecutorConfig queryExecutorConfig) { Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit, "Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d", _maxInitialResultHolderCapacity, _numGroupsLimit); - LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}", - _maxInitialResultHolderCapacity, _numGroupsLimit); + _enableSegmentGroupTrim = + queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, DEFAULT_ENABLE_SEGMENT_GROUP_TRIM); + LOGGER.info( + "Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, enableSegmentTrim: {}", + _maxInitialResultHolderCapacity, _numGroupsLimit, _enableSegmentGroupTrim); + } + + /** + * Returns {@code true} if the given aggregation-only without filter QueryContext can be solved with segment metadata, + * {@code false} otherwise. + * <p>Aggregations supported: COUNT + */ + @VisibleForTesting + static boolean isFitForMetadataBasedPlan(QueryContext queryContext) { Review comment: Why moving these 2 methods? Seems unrelated ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java ########## @@ -43,21 +43,27 @@ private final IndexSegment _indexSegment; private final int _maxInitialResultHolderCapacity; private final int _numGroupsLimit; + private final boolean _enableSegmentGroupTrim; + private final int _inSegmentTrimLimit; Review comment: Change it to `_minSegmentTrimSize` and make it default to 5000, then we can pass it to `GroupByUtils` to calculate the actual trim size -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org