Jackie-Jiang commented on a change in pull request #6991: URL: https://github.com/apache/incubator-pinot/pull/6991#discussion_r644984885
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +/** + * Helper class to store a subset of Record fields + * IntermediateRecord is derived from a Record + * Some of the main properties of an IntermediateRecord are: + * + * 1. Key in IntermediateRecord is expected to be identical to the one in the Record + * 2. For values, IntermediateRecord should only have the columns needed for order by + * 3. Inside the values, the columns should be ordered by the order by sequence + * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable Review comment: ```suggestion * 4. For order by on aggregations, final results are extracted ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java ########## @@ -93,6 +93,7 @@ private final int _globalGroupIdUpperBound; private final RawKeyHolder _rawKeyHolder; + private int _keyNum; Review comment: Remove ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -203,34 +206,55 @@ private IntermediateRecord getIntermediateRecord(Key key, Record record) { } int numRecordsToRetain = Math.min(numRecords, trimToSize); // make PQ of sorted records to retain - PriorityQueue<IntermediateRecord> priorityQueue = convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); + PriorityQueue<IntermediateRecord> priorityQueue = + convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); Record[] sortedArray = new Record[numRecordsToRetain]; while (!priorityQueue.isEmpty()) { IntermediateRecord intermediateRecord = priorityQueue.poll(); - Record record = recordsMap.get(intermediateRecord._key); - sortedArray[--numRecordsToRetain] = record; + sortedArray[--numRecordsToRetain] = intermediateRecord._record;; } return Arrays.asList(sortedArray); } /** - * Helper class to store a subset of Record fields - * IntermediateRecord is derived from a Record - * Some of the main properties of an IntermediateRecord are: - * - * 1. Key in IntermediateRecord is expected to be identical to the one in the Record - * 2. For values, IntermediateRecord should only have the columns needed for order by - * 3. Inside the values, the columns should be ordered by the order by sequence - * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * Trim the aggregation results using a priority queue and returns a the priority queue. + * This method is to be called from individual segment if the intermediate results need to be trimmed. + * The use case now is Multi-Segment GroupBy OrderBy query. */ - private static class IntermediateRecord { - final Key _key; - final Comparable[] _values; + public PriorityQueue<IntermediateRecord> trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator, + GroupByResultHolder[] _groupByResultHolders, int size) { + if (!groupKeyIterator.hasNext() || _groupByResultHolders.length == 0 || size == 0) { Review comment: These checks should have already been performed on the caller side ########## File path: pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java ########## @@ -65,6 +76,33 @@ public void setUp() { new Key(new Object[]{"c", 50, 4.0}), new Key(new Object[]{"c", 300, 5.0}) ); + + // GroupKeys: d1: a0 ~ a14, d2: 10.0 ~ 24.0, d3: 1.0 ~ 15.0 + _groupKeys = new LinkedList<>(); + for (int i = 0; i < 15; ++i) { + GroupKeyGenerator.GroupKey groupKey = new GroupKeyGenerator.GroupKey(); + groupKey._keys = new Object[]{"a" + i, 10.0 + i, 1.0 + i}; + groupKey._groupId = i; + + _groupKeys.add(groupKey); + } + + // Aggregation results: sum(m1) = 10 % d3, max(m2) = 100 % d3, + // distinctcount(m3) = set(new int[]{j}), avg(m4) = 10 / (j + 1) + _groupByResultHolders = new GroupByResultHolder[NUM_RESULT_HOLDER]; + _groupByResultHolders[0] = new DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0); + _groupByResultHolders[1] = new DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0); + _groupByResultHolders[2] = new ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size()); + _groupByResultHolders[3] = new ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size()); + for (int j = 0; j < _groupKeys.size(); ++j) { + _groupByResultHolders[0] + .setValueForKey(_groupKeys.get(j)._groupId, 10 % ((Double) _groupKeys.get(j)._keys[2])); + _groupByResultHolders[1] + .setValueForKey(_groupKeys.get(j)._groupId, 100 % ((Double) _groupKeys.get(j)._keys[2])); + _groupByResultHolders[2].setValueForKey(_groupKeys.get(j)._groupId, new IntOpenHashSet(new int[]{j})); + _groupByResultHolders[3].setValueForKey(_groupKeys.get(j)._groupId, new AvgPair(10, j + 1)); + } + //@formatter:on Review comment: Move this line to line 79 ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java ########## @@ -72,6 +73,46 @@ public void testGroupByOrderByPQLResponse(String query, List<String[]> expectedG expectedValues, true); } + /** + * Tests the in-segment trim option for GroupBy OrderBy query + */ + @Test(dataProvider = "orderBySQLResultTableProvider") + public void testGroupByOrderByTrimOptSQLLowLimitResponse(String query, List<Object[]> expectedResults, + long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, + long expectedNumTotalDocs, DataSchema expectedDataSchema) { + expectedResults = expectedResults.subList(0, expectedResults.size() / 2); Review comment: Same as the comments in `InterSegmentOrderByMultiValueQueriesTest` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java ########## @@ -36,6 +36,15 @@ public static int getTableCapacity(int limit) { return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT); } + /** + * Returns the capacity of the table required by the given query. + * NOTE: It returns {@code max(limit * 5, resultLowerLimit)} where resultLowerLimit is configured by the user + * (Default: 5000) + */ + public static int getTableCapacity(int limit, int resultLowerLimit) { Review comment: Also change `NUM_RESULTS_LOWER_LIMIT` to public `DEFAULT_MIN_NUM_GROUPS` which can be accessed by the plan maker ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -52,39 +52,56 @@ import org.slf4j.LoggerFactory; + /** * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; + public static final String SIZE_SEGMENT_GROUP_TRIM = "size.segment.group.trim"; + public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1; Review comment: ```suggestion public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "min.segment.group.trim.size"; public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1; ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +/** + * Helper class to store a subset of Record fields + * IntermediateRecord is derived from a Record + * Some of the main properties of an IntermediateRecord are: + * + * 1. Key in IntermediateRecord is expected to be identical to the one in the Record + * 2. For values, IntermediateRecord should only have the columns needed for order by + * 3. Inside the values, the columns should be ordered by the order by sequence + * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * 5. There is an optional field to store the original record. Each segment can keep the intermediate result in this + * form to prevent constructing IntermediateRecord again in the server. + */ +public class IntermediateRecord { + public final Key _key; + public final Comparable[] _values; + public final Record _record; + + IntermediateRecord(Key key, Comparable[] values, Record record) { + _key = key; + _values = values; + _record = record; + } + Review comment: (nit) remove empty line ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java ########## @@ -203,34 +206,55 @@ private IntermediateRecord getIntermediateRecord(Key key, Record record) { } int numRecordsToRetain = Math.min(numRecords, trimToSize); // make PQ of sorted records to retain - PriorityQueue<IntermediateRecord> priorityQueue = convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); + PriorityQueue<IntermediateRecord> priorityQueue = + convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); Record[] sortedArray = new Record[numRecordsToRetain]; while (!priorityQueue.isEmpty()) { IntermediateRecord intermediateRecord = priorityQueue.poll(); - Record record = recordsMap.get(intermediateRecord._key); - sortedArray[--numRecordsToRetain] = record; + sortedArray[--numRecordsToRetain] = intermediateRecord._record;; } return Arrays.asList(sortedArray); } /** - * Helper class to store a subset of Record fields - * IntermediateRecord is derived from a Record - * Some of the main properties of an IntermediateRecord are: - * - * 1. Key in IntermediateRecord is expected to be identical to the one in the Record - * 2. For values, IntermediateRecord should only have the columns needed for order by - * 3. Inside the values, the columns should be ordered by the order by sequence - * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * Trim the aggregation results using a priority queue and returns a the priority queue. + * This method is to be called from individual segment if the intermediate results need to be trimmed. + * The use case now is Multi-Segment GroupBy OrderBy query. Review comment: ```suggestion * Trims the aggregation results using a priority queue and returns the priority queue. * This method is to be called from individual segment if the intermediate results need to be trimmed. ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +/** + * Helper class to store a subset of Record fields + * IntermediateRecord is derived from a Record + * Some of the main properties of an IntermediateRecord are: + * + * 1. Key in IntermediateRecord is expected to be identical to the one in the Record + * 2. For values, IntermediateRecord should only have the columns needed for order by + * 3. Inside the values, the columns should be ordered by the order by sequence + * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * 5. There is an optional field to store the original record. Each segment can keep the intermediate result in this Review comment: The record is mandatory but not optional right? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -52,39 +52,56 @@ import org.slf4j.LoggerFactory; + /** * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; + public static final String SIZE_SEGMENT_GROUP_TRIM = "size.segment.group.trim"; + public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1; // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; - + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); private final int _maxInitialResultHolderCapacity; // Limit on number of groups stored for each segment, beyond which no new group will be created private final int _numGroupsLimit; // Used for SQL GROUP BY (server combine) private final int _groupByTrimThreshold; + private final boolean _enableSegmentGroupTrim; + private final int _minSegmentTrimSize; Review comment: ```suggestion private final int _minSegmentGroupTrimSize; ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java ########## @@ -144,4 +147,14 @@ protected void aggregate(TransformBlock transformBlock, int length, int function public AggregationGroupByResult getResult() { return new AggregationGroupByResult(_groupKeyGenerator, _aggregationFunctions, _groupByResultHolders); } + + @Override + public int getResultNum() { Review comment: ```suggestion public int getNumGroups() { ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java ########## @@ -281,6 +294,14 @@ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) { _numGroupsLimitReached = numGroupsLimitReached; } + /** + * Get the collection of intermediate records + */ + @Nullable + public Collection<IntermediateRecord> getIntermediateResult() { Review comment: ```suggestion public Collection<IntermediateRecord> getIntermediateRecords() { ``` ########## File path: pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java ########## @@ -65,6 +76,33 @@ public void setUp() { new Key(new Object[]{"c", 50, 4.0}), new Key(new Object[]{"c", 300, 5.0}) ); + + // GroupKeys: d1: a0 ~ a14, d2: 10.0 ~ 24.0, d3: 1.0 ~ 15.0 + _groupKeys = new LinkedList<>(); Review comment: Let's make `_groupKeys` and `_groupByResultHolders` consistent with `_records` and `_keys` (5 groups, values in holder the same as in records) ########## File path: pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java ########## @@ -43,10 +49,15 @@ new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT}); private static final int TRIM_TO_SIZE = 3; + private static final int NUM_RESULT_HOLDER = 4; Review comment: We should be able to use the same `TRIM_TO_SIZE` for inner segment trim ########## File path: pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java ########## @@ -36,6 +36,15 @@ public static int getTableCapacity(int limit) { return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT); } + /** + * Returns the capacity of the table required by the given query. + * NOTE: It returns {@code max(limit * 5, resultLowerLimit)} where resultLowerLimit is configured by the user + * (Default: 5000) + */ + public static int getTableCapacity(int limit, int resultLowerLimit) { Review comment: ```suggestion public static int getTableCapacity(int limit, int minNumGroups) { ``` ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java ########## @@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, List<Object[]> expected expectedResults, expectedResults.size(), expectedDataSchema); } + /** + * Tests the in-segment trim option for GroupBy OrderBy query + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, List<Object[]> expectedResults, + long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) { + + expectedResults = expectedResults.subList(0, expectedResults.size() / 2); Review comment: I don't see the value of this test. This test only changes the limit of the query ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java ########## @@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, List<Object[]> expected expectedResults, expectedResults.size(), expectedDataSchema); } + /** + * Tests the in-segment trim option for GroupBy OrderBy query + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, List<Object[]> expectedResults, + long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) { + + expectedResults = expectedResults.subList(0, expectedResults.size() / 2); + + if (query.toUpperCase().contains("LIMIT")) { + String[] keyWords = query.split(" "); + keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size()); + query = String.join(" ", keyWords); + } else { + query += " LIMIT " + expectedResults.size(); + } + + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(expectedResults.size(), true); + BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, planMaker); + QueriesTestUtils + .testInterSegmentResultTable(brokerResponse, 400000L, 0, expectedNumEntriesScannedPostFilter, 400000L, + expectedResults, expectedResults.size(), expectedDataSchema); + } + + /** + * Tests the in-segment build option for GroupBy OrderBy query. (No trim) + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVTrimOptHighLimitSQLResults(String query, List<Object[]> expectedResults, Review comment: ```suggestion public void testGroupByOrderByMVSegmentTrimSQLResults(String query, List<Object[]> expectedResults, ``` ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java ########## @@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, List<Object[]> expected expectedResults, expectedResults.size(), expectedDataSchema); } + /** + * Tests the in-segment trim option for GroupBy OrderBy query + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, List<Object[]> expectedResults, + long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) { + + expectedResults = expectedResults.subList(0, expectedResults.size() / 2); + + if (query.toUpperCase().contains("LIMIT")) { + String[] keyWords = query.split(" "); + keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size()); + query = String.join(" ", keyWords); + } else { + query += " LIMIT " + expectedResults.size(); + } + + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(expectedResults.size(), true); + BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, planMaker); + QueriesTestUtils + .testInterSegmentResultTable(brokerResponse, 400000L, 0, expectedNumEntriesScannedPostFilter, 400000L, + expectedResults, expectedResults.size(), expectedDataSchema); + } + + /** + * Tests the in-segment build option for GroupBy OrderBy query. (No trim) + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVTrimOptHighLimitSQLResults(String query, List<Object[]> expectedResults, + long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) { + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(expectedResults.size() + 1, true); Review comment: Here we can simply put a very low `minSegmentGroupTrimSize` and expect the same result ```suggestion InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1, true); ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -52,39 +52,56 @@ import org.slf4j.LoggerFactory; + /** * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; + public static final String SIZE_SEGMENT_GROUP_TRIM = "size.segment.group.trim"; + public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1; // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; - + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); Review comment: Move this line back for better readability ########## File path: pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java ########## @@ -72,6 +73,46 @@ public void testGroupByOrderByPQLResponse(String query, List<String[]> expectedG expectedValues, true); } + /** + * Tests the in-segment trim option for GroupBy OrderBy query + */ + @Test(dataProvider = "orderBySQLResultTableProvider") + public void testGroupByOrderByTrimOptSQLLowLimitResponse(String query, List<Object[]> expectedResults, + long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, + long expectedNumTotalDocs, DataSchema expectedDataSchema) { + expectedResults = expectedResults.subList(0, expectedResults.size() / 2); + + if (query.toUpperCase().contains("LIMIT")) { + String[] keyWords = query.split(" "); + keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size()); + query = String.join(" ", keyWords); + } else { + query += " LIMIT " + expectedResults.size(); + } + + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(expectedResults.size(), true); + BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, planMaker); + QueriesTestUtils + .testInterSegmentResultTable(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, expectedResults, expectedResults.size(), + expectedDataSchema); + } + + /** + * Tests the in-segment build option for GroupBy OrderBy query. (No trim) + */ + @Test(dataProvider = "orderBySQLResultTableProvider") + public void testGroupByOrderByTrimOptHighLimitSQLResponse(String query, List<Object[]> expectedResults, Review comment: Same as the comments in `InterSegmentOrderByMultiValueQueriesTest` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -105,8 +121,61 @@ public InstancePlanMakerImplV2(QueryExecutorConfig queryExecutorConfig) { Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit, "Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d", _maxInitialResultHolderCapacity, _numGroupsLimit); - LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}", - _maxInitialResultHolderCapacity, _numGroupsLimit); + _enableSegmentGroupTrim = + queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, DEFAULT_ENABLE_SEGMENT_GROUP_TRIM); + LOGGER.info( + "Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, enableSegmentTrim: {}", + _maxInitialResultHolderCapacity, _numGroupsLimit, _enableSegmentGroupTrim); + } + + /** + * Returns {@code true} if the given aggregation-only without filter QueryContext can be solved with segment metadata, + * {@code false} otherwise. + * <p>Aggregations supported: COUNT + */ + @VisibleForTesting + static boolean isFitForMetadataBasedPlan(QueryContext queryContext) { Review comment: Don't enable the `rearrange code` option during the reformat. It will introduce too much unnecessary change ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java ########## @@ -106,8 +116,14 @@ protected IntermediateResultsBlock getNextBlock() { groupByExecutor.process(transformBlock); } - // Build intermediate result block based on aggregation group-by result from the executor - return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + // Trim is off or no need to trim + if (_trimSize == TRIM_OFF || groupByExecutor.getResultNum() <= _trimSize) { Review comment: ```suggestion if (_trimSize <= 0 || groupByExecutor.getNumGroups() <= _trimSize) { ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -52,39 +52,56 @@ import org.slf4j.LoggerFactory; + /** * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; + public static final String SIZE_SEGMENT_GROUP_TRIM = "size.segment.group.trim"; + public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1; // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; - + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); private final int _maxInitialResultHolderCapacity; // Limit on number of groups stored for each segment, beyond which no new group will be created private final int _numGroupsLimit; // Used for SQL GROUP BY (server combine) private final int _groupByTrimThreshold; + private final boolean _enableSegmentGroupTrim; + private final int _minSegmentTrimSize; Review comment: As discussed offline, we can remove `_enableSegmentGroupTrim` and only maintain the trim size here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org