Jackie-Jiang commented on a change in pull request #6991:
URL: https://github.com/apache/incubator-pinot/pull/6991#discussion_r644984885



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+/**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the 
Record
+ * 2. For values, IntermediateRecord should only have the columns needed for 
order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable

Review comment:
       ```suggestion
    * 4. For order by on aggregations, final results are extracted
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
##########
@@ -93,6 +93,7 @@
 
   private final int _globalGroupIdUpperBound;
   private final RawKeyHolder _rawKeyHolder;
+  private int _keyNum;

Review comment:
       Remove

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -203,34 +206,55 @@ private IntermediateRecord getIntermediateRecord(Key key, 
Record record) {
     }
     int numRecordsToRetain = Math.min(numRecords, trimToSize);
     // make PQ of sorted records to retain
-    PriorityQueue<IntermediateRecord> priorityQueue = 
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
+    PriorityQueue<IntermediateRecord> priorityQueue =
+        convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
     Record[] sortedArray = new Record[numRecordsToRetain];
     while (!priorityQueue.isEmpty()) {
       IntermediateRecord intermediateRecord = priorityQueue.poll();
-      Record record = recordsMap.get(intermediateRecord._key);
-      sortedArray[--numRecordsToRetain] = record;
+      sortedArray[--numRecordsToRetain] = intermediateRecord._record;;
     }
     return Arrays.asList(sortedArray);
   }
 
   /**
-   * Helper class to store a subset of Record fields
-   * IntermediateRecord is derived from a Record
-   * Some of the main properties of an IntermediateRecord are:
-   *
-   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
-   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
-   * 3. Inside the values, the columns should be ordered by the order by 
sequence
-   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * Trim the aggregation results using a priority queue and returns a the 
priority queue.
+   * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
+   * The use case now is Multi-Segment GroupBy OrderBy query.
    */
-  private static class IntermediateRecord {
-    final Key _key;
-    final Comparable[] _values;
+  public PriorityQueue<IntermediateRecord> 
trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator,
+      GroupByResultHolder[] _groupByResultHolders, int size) {
+    if (!groupKeyIterator.hasNext() || _groupByResultHolders.length == 0 || 
size == 0) {

Review comment:
       These checks should have already been performed on the caller side

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
##########
@@ -65,6 +76,33 @@ public void setUp() {
         new Key(new Object[]{"c", 50, 4.0}),
         new Key(new Object[]{"c", 300, 5.0})
     );
+
+    // GroupKeys: d1: a0 ~ a14, d2: 10.0 ~ 24.0, d3: 1.0 ~ 15.0
+    _groupKeys = new LinkedList<>();
+    for (int i = 0; i < 15; ++i) {
+      GroupKeyGenerator.GroupKey groupKey = new GroupKeyGenerator.GroupKey();
+      groupKey._keys = new Object[]{"a" + i, 10.0 + i, 1.0 + i};
+      groupKey._groupId = i;
+
+      _groupKeys.add(groupKey);
+    }
+
+    // Aggregation results: sum(m1) = 10 % d3, max(m2) = 100 % d3,
+    //                      distinctcount(m3) = set(new int[]{j}), avg(m4) = 
10 / (j + 1)
+    _groupByResultHolders = new GroupByResultHolder[NUM_RESULT_HOLDER];
+    _groupByResultHolders[0] = new 
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+    _groupByResultHolders[1] = new 
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+    _groupByResultHolders[2] = new 
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+    _groupByResultHolders[3] = new 
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+    for (int j = 0; j < _groupKeys.size(); ++j) {
+      _groupByResultHolders[0]
+          .setValueForKey(_groupKeys.get(j)._groupId, 10 % ((Double) 
_groupKeys.get(j)._keys[2]));
+      _groupByResultHolders[1]
+          .setValueForKey(_groupKeys.get(j)._groupId, 100 % ((Double) 
_groupKeys.get(j)._keys[2]));
+      _groupByResultHolders[2].setValueForKey(_groupKeys.get(j)._groupId, new 
IntOpenHashSet(new int[]{j}));
+      _groupByResultHolders[3].setValueForKey(_groupKeys.get(j)._groupId, new 
AvgPair(10, j + 1));
+    }
+
     //@formatter:on

Review comment:
       Move this line to line 79

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
##########
@@ -72,6 +73,46 @@ public void testGroupByOrderByPQLResponse(String query, 
List<String[]> expectedG
         expectedValues, true);
   }
 
+  /**
+   * Tests the in-segment trim option for GroupBy OrderBy query
+   */
+  @Test(dataProvider = "orderBySQLResultTableProvider")
+  public void testGroupByOrderByTrimOptSQLLowLimitResponse(String query, 
List<Object[]> expectedResults,
+      long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
+      long expectedNumTotalDocs, DataSchema expectedDataSchema) {
+    expectedResults = expectedResults.subList(0, expectedResults.size() / 2);

Review comment:
       Same as the comments in `InterSegmentOrderByMultiValueQueriesTest`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
##########
@@ -36,6 +36,15 @@ public static int getTableCapacity(int limit) {
     return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
   }
 
+  /**
+   * Returns the capacity of the table required by the given query.
+   * NOTE: It returns {@code max(limit * 5, resultLowerLimit)} where 
resultLowerLimit is configured by the user
+   *      (Default: 5000)
+   */
+  public static int getTableCapacity(int limit, int resultLowerLimit) {

Review comment:
       Also change `NUM_RESULTS_LOWER_LIMIT` to public `DEFAULT_MIN_NUM_GROUPS` 
which can be accessed by the plan maker

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -52,39 +52,56 @@
 import org.slf4j.LoggerFactory;
 
 
+
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+  public static final String SIZE_SEGMENT_GROUP_TRIM = 
"size.segment.group.trim";
+  public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1;

Review comment:
       ```suggestion
     public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = 
"min.segment.group.trim.size";
     public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+/**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the 
Record
+ * 2. For values, IntermediateRecord should only have the columns needed for 
order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+ * 5. There is an optional field to store the original record. Each segment 
can keep the intermediate result in this
+ * form to prevent constructing IntermediateRecord again in the server.
+ */
+public class IntermediateRecord {
+  public final Key _key;
+  public final Comparable[] _values;
+  public final Record _record;
+
+  IntermediateRecord(Key key, Comparable[] values, Record record) {
+    _key = key;
+    _values = values;
+    _record = record;
+  }
+

Review comment:
       (nit) remove empty line

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -203,34 +206,55 @@ private IntermediateRecord getIntermediateRecord(Key key, 
Record record) {
     }
     int numRecordsToRetain = Math.min(numRecords, trimToSize);
     // make PQ of sorted records to retain
-    PriorityQueue<IntermediateRecord> priorityQueue = 
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
+    PriorityQueue<IntermediateRecord> priorityQueue =
+        convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
     Record[] sortedArray = new Record[numRecordsToRetain];
     while (!priorityQueue.isEmpty()) {
       IntermediateRecord intermediateRecord = priorityQueue.poll();
-      Record record = recordsMap.get(intermediateRecord._key);
-      sortedArray[--numRecordsToRetain] = record;
+      sortedArray[--numRecordsToRetain] = intermediateRecord._record;;
     }
     return Arrays.asList(sortedArray);
   }
 
   /**
-   * Helper class to store a subset of Record fields
-   * IntermediateRecord is derived from a Record
-   * Some of the main properties of an IntermediateRecord are:
-   *
-   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
-   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
-   * 3. Inside the values, the columns should be ordered by the order by 
sequence
-   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * Trim the aggregation results using a priority queue and returns a the 
priority queue.
+   * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
+   * The use case now is Multi-Segment GroupBy OrderBy query.

Review comment:
       ```suggestion
      * Trims the aggregation results using a priority queue and returns the 
priority queue.
      * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+/**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the 
Record
+ * 2. For values, IntermediateRecord should only have the columns needed for 
order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+ * 5. There is an optional field to store the original record. Each segment 
can keep the intermediate result in this

Review comment:
       The record is mandatory but not optional right?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -52,39 +52,56 @@
 import org.slf4j.LoggerFactory;
 
 
+
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+  public static final String SIZE_SEGMENT_GROUP_TRIM = 
"size.segment.group.trim";
+  public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1;
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private final int _numGroupsLimit;
   // Used for SQL GROUP BY (server combine)
   private final int _groupByTrimThreshold;
+  private final boolean _enableSegmentGroupTrim;
+  private final int _minSegmentTrimSize;

Review comment:
       ```suggestion
     private final int _minSegmentGroupTrimSize;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
##########
@@ -144,4 +147,14 @@ protected void aggregate(TransformBlock transformBlock, 
int length, int function
   public AggregationGroupByResult getResult() {
     return new AggregationGroupByResult(_groupKeyGenerator, 
_aggregationFunctions, _groupByResultHolders);
   }
+
+  @Override
+  public int getResultNum() {

Review comment:
       ```suggestion
     public int getNumGroups() {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
##########
@@ -281,6 +294,14 @@ public void setNumGroupsLimitReached(boolean 
numGroupsLimitReached) {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  /**
+   * Get the collection of intermediate records
+   */
+  @Nullable
+  public Collection<IntermediateRecord> getIntermediateResult() {

Review comment:
       ```suggestion
     public Collection<IntermediateRecord> getIntermediateRecords() {
   ```

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
##########
@@ -65,6 +76,33 @@ public void setUp() {
         new Key(new Object[]{"c", 50, 4.0}),
         new Key(new Object[]{"c", 300, 5.0})
     );
+
+    // GroupKeys: d1: a0 ~ a14, d2: 10.0 ~ 24.0, d3: 1.0 ~ 15.0
+    _groupKeys = new LinkedList<>();

Review comment:
       Let's make `_groupKeys` and `_groupByResultHolders` consistent with 
`_records` and `_keys` (5 groups, values in holder the same as in records)

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
##########
@@ -43,10 +49,15 @@
       new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", 
"distinctcount(m3)", "avg(m4)"},
           new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT});
   private static final int TRIM_TO_SIZE = 3;
+  private static final int NUM_RESULT_HOLDER = 4;

Review comment:
       We should be able to use the same `TRIM_TO_SIZE` for inner segment trim

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
##########
@@ -36,6 +36,15 @@ public static int getTableCapacity(int limit) {
     return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
   }
 
+  /**
+   * Returns the capacity of the table required by the given query.
+   * NOTE: It returns {@code max(limit * 5, resultLowerLimit)} where 
resultLowerLimit is configured by the user
+   *      (Default: 5000)
+   */
+  public static int getTableCapacity(int limit, int resultLowerLimit) {

Review comment:
       ```suggestion
     public static int getTableCapacity(int limit, int minNumGroups) {
   ```

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
##########
@@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, 
List<Object[]> expected
             expectedResults, expectedResults.size(), expectedDataSchema);
   }
 
+  /**
+   * Tests the in-segment trim option for GroupBy OrderBy query
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) 
{
+
+    expectedResults = expectedResults.subList(0, expectedResults.size() / 2);

Review comment:
       I don't see the value of this test. This test only changes the limit of 
the query

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
##########
@@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, 
List<Object[]> expected
             expectedResults, expectedResults.size(), expectedDataSchema);
   }
 
+  /**
+   * Tests the in-segment trim option for GroupBy OrderBy query
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) 
{
+
+    expectedResults = expectedResults.subList(0, expectedResults.size() / 2);
+
+    if (query.toUpperCase().contains("LIMIT")) {
+      String[] keyWords = query.split(" ");
+      keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size());
+      query = String.join(" ", keyWords);
+    } else {
+      query += " LIMIT " + expectedResults.size();
+    }
+
+    InstancePlanMakerImplV2 planMaker = new 
InstancePlanMakerImplV2(expectedResults.size(), true);
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, 
planMaker);
+    QueriesTestUtils
+        .testInterSegmentResultTable(brokerResponse, 400000L, 0, 
expectedNumEntriesScannedPostFilter, 400000L,
+            expectedResults, expectedResults.size(), expectedDataSchema);
+  }
+
+  /**
+   * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVTrimOptHighLimitSQLResults(String query, 
List<Object[]> expectedResults,

Review comment:
       ```suggestion
     public void testGroupByOrderByMVSegmentTrimSQLResults(String query, 
List<Object[]> expectedResults,
   ```

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
##########
@@ -41,6 +42,43 @@ public void testGroupByOrderByMVSQLResults(String query, 
List<Object[]> expected
             expectedResults, expectedResults.size(), expectedDataSchema);
   }
 
+  /**
+   * Tests the in-segment trim option for GroupBy OrderBy query
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVTrimOptLowLimitSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) 
{
+
+    expectedResults = expectedResults.subList(0, expectedResults.size() / 2);
+
+    if (query.toUpperCase().contains("LIMIT")) {
+      String[] keyWords = query.split(" ");
+      keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size());
+      query = String.join(" ", keyWords);
+    } else {
+      query += " LIMIT " + expectedResults.size();
+    }
+
+    InstancePlanMakerImplV2 planMaker = new 
InstancePlanMakerImplV2(expectedResults.size(), true);
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, 
planMaker);
+    QueriesTestUtils
+        .testInterSegmentResultTable(brokerResponse, 400000L, 0, 
expectedNumEntriesScannedPostFilter, 400000L,
+            expectedResults, expectedResults.size(), expectedDataSchema);
+  }
+
+  /**
+   * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVTrimOptHighLimitSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) 
{
+    InstancePlanMakerImplV2 planMaker = new 
InstancePlanMakerImplV2(expectedResults.size() + 1, true);

Review comment:
       Here we can simply put a very low `minSegmentGroupTrimSize` and expect 
the same result
   ```suggestion
       InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1, true);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -52,39 +52,56 @@
 import org.slf4j.LoggerFactory;
 
 
+
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+  public static final String SIZE_SEGMENT_GROUP_TRIM = 
"size.segment.group.trim";
+  public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1;
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

Review comment:
       Move this line back for better readability

##########
File path: 
pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
##########
@@ -72,6 +73,46 @@ public void testGroupByOrderByPQLResponse(String query, 
List<String[]> expectedG
         expectedValues, true);
   }
 
+  /**
+   * Tests the in-segment trim option for GroupBy OrderBy query
+   */
+  @Test(dataProvider = "orderBySQLResultTableProvider")
+  public void testGroupByOrderByTrimOptSQLLowLimitResponse(String query, 
List<Object[]> expectedResults,
+      long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
+      long expectedNumTotalDocs, DataSchema expectedDataSchema) {
+    expectedResults = expectedResults.subList(0, expectedResults.size() / 2);
+
+    if (query.toUpperCase().contains("LIMIT")) {
+      String[] keyWords = query.split(" ");
+      keyWords[keyWords.length - 1] = String.valueOf(expectedResults.size());
+      query = String.join(" ", keyWords);
+    } else {
+      query += " LIMIT " + expectedResults.size();
+    }
+
+    InstancePlanMakerImplV2 planMaker = new 
InstancePlanMakerImplV2(expectedResults.size(), true);
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, 
planMaker);
+    QueriesTestUtils
+        .testInterSegmentResultTable(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, 
expectedResults, expectedResults.size(),
+            expectedDataSchema);
+  }
+
+  /**
+   * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+   */
+  @Test(dataProvider = "orderBySQLResultTableProvider")
+  public void testGroupByOrderByTrimOptHighLimitSQLResponse(String query, 
List<Object[]> expectedResults,

Review comment:
       Same as the comments in `InterSegmentOrderByMultiValueQueriesTest`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -105,8 +121,61 @@ public InstancePlanMakerImplV2(QueryExecutorConfig 
queryExecutorConfig) {
     Preconditions.checkState(_maxInitialResultHolderCapacity <= 
_numGroupsLimit,
         "Invalid configuration: maxInitialResultHolderCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _maxInitialResultHolderCapacity, _numGroupsLimit);
-    LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: 
{}, numGroupsLimit: {}",
-        _maxInitialResultHolderCapacity, _numGroupsLimit);
+    _enableSegmentGroupTrim =
+        queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, 
DEFAULT_ENABLE_SEGMENT_GROUP_TRIM);
+    LOGGER.info(
+        "Initializing plan maker with maxInitialResultHolderCapacity: {}, 
numGroupsLimit: {}, enableSegmentTrim: {}",
+        _maxInitialResultHolderCapacity, _numGroupsLimit, 
_enableSegmentGroupTrim);
+  }
+
+  /**
+   * Returns {@code true} if the given aggregation-only without filter 
QueryContext can be solved with segment metadata,
+   * {@code false} otherwise.
+   * <p>Aggregations supported: COUNT
+   */
+  @VisibleForTesting
+  static boolean isFitForMetadataBasedPlan(QueryContext queryContext) {

Review comment:
       Don't enable the `rearrange code` option during the reformat. It will 
introduce too much unnecessary change

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -106,8 +116,14 @@ protected IntermediateResultsBlock getNextBlock() {
       groupByExecutor.process(transformBlock);
     }
 
-    // Build intermediate result block based on aggregation group-by result 
from the executor
-    return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    // Trim is off or no need to trim
+    if (_trimSize == TRIM_OFF || groupByExecutor.getResultNum() <= _trimSize) {

Review comment:
       ```suggestion
       if (_trimSize <= 0 || groupByExecutor.getNumGroups() <= _trimSize) {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -52,39 +52,56 @@
 import org.slf4j.LoggerFactory;
 
 
+
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+  public static final String SIZE_SEGMENT_GROUP_TRIM = 
"size.segment.group.trim";
+  public static final int DEFAULT_SEGMENT_TRIM_SIZE = -1;
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private final int _numGroupsLimit;
   // Used for SQL GROUP BY (server combine)
   private final int _groupByTrimThreshold;
+  private final boolean _enableSegmentGroupTrim;
+  private final int _minSegmentTrimSize;

Review comment:
       As discussed offline, we can remove `_enableSegmentGroupTrim` and only 
maintain the trim size here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to