This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a5d5b7  Introduce in-Segment Trim for GroupBy OrderBy Query (#6991)
2a5d5b7 is described below

commit 2a5d5b7517a46ec76d73742b31c6d9e457920f21
Author: wuwenw <55009204+wuw...@users.noreply.github.com>
AuthorDate: Thu Jun 10 17:15:06 2021 -0400

    Introduce in-Segment Trim for GroupBy OrderBy Query (#6991)
    
    One of the major bottlenecks for the current GroupBy OrderBy query on high 
cardinality columns is the merge phase. Essentially every segment brings a 
large number of intermediate results to a global concurrent map for further 
aggregation and merge, which takes up a lot of space and is very 
time-consuming. This PR introduces an optimization option that each segment 
trims its intermediate results to a given size. The size is configurable by the 
user and is guaranteed to be max(limit N * [...]
---
 .../pinot/core/data/table/IntermediateRecord.java  |  42 +++
 .../apache/pinot/core/data/table/TableResizer.java |  58 +++--
 .../operator/blocks/IntermediateResultsBlock.java  |  29 ++-
 .../combine/GroupByOrderByCombineOperator.java     |  37 ++-
 .../query/AggregationGroupByOrderByOperator.java   |  29 ++-
 .../plan/AggregationGroupByOrderByPlanNode.java    |  13 +-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  40 ++-
 .../groupby/DefaultGroupByExecutor.java            |  13 +
 .../groupby/DictionaryBasedGroupKeyGenerator.java  |  36 +++
 .../query/aggregation/groupby/GroupByExecutor.java |  18 ++
 .../aggregation/groupby/GroupKeyGenerator.java     |   5 +
 .../NoDictionaryMultiColumnGroupKeyGenerator.java  |   5 +
 .../NoDictionarySingleColumnGroupKeyGenerator.java |   5 +
 .../org/apache/pinot/core/util/GroupByUtils.java   |  15 +-
 .../pinot/core/data/table/TableResizerTest.java    |  85 ++++++
 .../groupby/GroupByInSegmentTrimTest.java          | 284 +++++++++++++++++++++
 .../InterSegmentOrderByMultiValueQueriesTest.java  |  14 +
 .../InterSegmentOrderBySingleValueQueriesTest.java |  16 ++
 18 files changed, 693 insertions(+), 51 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
new file mode 100644
index 0000000..520c3e8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+/**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the 
Record
+ * 2. For values, IntermediateRecord should only have the columns needed for 
order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results are extracted
+ * 5. There is a mandatory field to store the original record to prevent from 
duplicate looking up
+ */
+public class IntermediateRecord {
+  public final Key _key;
+  public final Comparable[] _values;
+  public final Record _record;
+
+  IntermediateRecord(Key key, Comparable[] values, Record record) {
+    _key = key;
+    _values = values;
+    _record = record;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index e306350..9f95bc0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
@@ -34,6 +35,8 @@ import 
org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.postaggregation.PostAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -128,7 +131,7 @@ public class TableResizer {
     for (int i = 0; i < _numOrderByExpressions; i++) {
       intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
     }
-    return new IntermediateRecord(key, intermediateRecordValues);
+    return new IntermediateRecord(key, intermediateRecordValues, record);
   }
 
   /**
@@ -166,7 +169,7 @@ public class TableResizer {
         PriorityQueue<IntermediateRecord> priorityQueue =
             convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
         for (IntermediateRecord recordToRetain : priorityQueue) {
-          trimmedRecordsMap.put(recordToRetain._key, 
recordsMap.get(recordToRetain._key));
+          trimmedRecordsMap.put(recordToRetain._key, recordToRetain._record);
         }
         return trimmedRecordsMap;
       }
@@ -203,34 +206,51 @@ public class TableResizer {
     }
     int numRecordsToRetain = Math.min(numRecords, trimToSize);
     // make PQ of sorted records to retain
-    PriorityQueue<IntermediateRecord> priorityQueue = 
convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
+    PriorityQueue<IntermediateRecord> priorityQueue =
+        convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, 
_intermediateRecordComparator.reversed());
     Record[] sortedArray = new Record[numRecordsToRetain];
     while (!priorityQueue.isEmpty()) {
       IntermediateRecord intermediateRecord = priorityQueue.poll();
-      Record record = recordsMap.get(intermediateRecord._key);
-      sortedArray[--numRecordsToRetain] = record;
+      sortedArray[--numRecordsToRetain] = intermediateRecord._record;;
     }
     return Arrays.asList(sortedArray);
   }
 
   /**
-   * Helper class to store a subset of Record fields
-   * IntermediateRecord is derived from a Record
-   * Some of the main properties of an IntermediateRecord are:
-   *
-   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
-   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
-   * 3. Inside the values, the columns should be ordered by the order by 
sequence
-   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * Trims the aggregation results using a priority queue and returns the 
priority queue.
+   * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
    */
-  private static class IntermediateRecord {
-    final Key _key;
-    final Comparable[] _values;
+  public PriorityQueue<IntermediateRecord> 
trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator,
+      GroupByResultHolder[] _groupByResultHolders, int size) {
+    int numAggregationFunctions = _aggregationFunctions.length;
+    int numColumns = numAggregationFunctions + _numGroupByExpressions;
 
-    IntermediateRecord(Key key, Comparable[] values) {
-      _key = key;
-      _values = values;
+    // Get comparator
+    Comparator<IntermediateRecord> comparator = 
_intermediateRecordComparator.reversed();
+    PriorityQueue<IntermediateRecord> priorityQueue = new 
PriorityQueue<>(size, comparator);
+    while (groupKeyIterator.hasNext()) {
+      // Iterate over keys
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      Object[] keys = groupKey._keys;
+      Object[] values = Arrays.copyOf(keys, numColumns);
+      int groupId = groupKey._groupId;
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        values[_numGroupByExpressions + i] =
+            
_aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i], 
groupId);
+      }
+      // {key, intermediate_record, record}
+      IntermediateRecord intermediateRecord = getIntermediateRecord(new 
Key(keys), new Record(values));
+      if (priorityQueue.size() < size) {
+        priorityQueue.offer(intermediateRecord);
+      } else {
+        IntermediateRecord peek = priorityQueue.peek();
+        if (comparator.compare(peek, intermediateRecord) < 0) {
+          priorityQueue.poll();
+          priorityQueue.offer(intermediateRecord);
+        }
+      }
     }
+    return priorityQueue;
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index b50a836..ac99fff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -57,6 +58,7 @@ public class IntermediateResultsBlock implements Block {
   private AggregationGroupByResult _aggregationGroupByResult;
   private List<Map<String, Object>> _combinedAggregationGroupByResult;
   private List<ProcessingException> _processingExceptions;
+  private Collection<IntermediateRecord> _intermediateRecords;
   private long _numDocsScanned;
   private long _numEntriesScannedInFilter;
   private long _numEntriesScannedPostFilter;
@@ -117,6 +119,17 @@ public class IntermediateResultsBlock implements Block {
     _dataSchema = dataSchema;
   }
 
+  /**
+   * Constructor for aggregation group-by order-by result with {@link 
AggregationGroupByResult} and
+   * with a collection of intermediate records.
+   */
+  public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
+      Collection<IntermediateRecord> intermediateRecords, DataSchema 
dataSchema) {
+    _aggregationFunctions = aggregationFunctions;
+    _dataSchema = dataSchema;
+    _intermediateRecords = intermediateRecords;
+  }
+
   public IntermediateResultsBlock(Table table) {
     _table = table;
     _dataSchema = table.getDataSchema();
@@ -210,14 +223,14 @@ public class IntermediateResultsBlock implements Block {
     _executionThreadCpuTimeNs = executionThreadCpuTimeNs;
   }
 
-  public void setNumServerThreads(int numServerThreads) {
-    _numServerThreads = numServerThreads;
-  }
-
   public int getNumServerThreads() {
     return _numServerThreads;
   }
 
+  public void setNumServerThreads(int numServerThreads) {
+    _numServerThreads = numServerThreads;
+  }
+
   @VisibleForTesting
   public long getNumDocsScanned() {
     return _numDocsScanned;
@@ -281,6 +294,14 @@ public class IntermediateResultsBlock implements Block {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  /**
+   * Get the collection of intermediate records
+   */
+  @Nullable
+  public Collection<IntermediateRecord> getIntermediateRecords() {
+    return _intermediateRecords;
+  }
+
   public DataTable getDataTable()
       throws Exception {
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 071a682..a230561 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
@@ -128,19 +130,30 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
       }
 
       // Merge aggregation group-by result.
-      AggregationGroupByResult aggregationGroupByResult = 
intermediateResultsBlock.getAggregationGroupByResult();
-      if (aggregationGroupByResult != null) {
-        // Iterate over the group-by keys, for each key, update the group-by 
result in the indexedTable
-        Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
aggregationGroupByResult.getGroupKeyIterator();
-        while (groupKeyIterator.hasNext()) {
-          GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
-          Object[] keys = groupKey._keys;
-          Object[] values = Arrays.copyOf(keys, _numColumns);
-          int groupId = groupKey._groupId;
-          for (int i = 0; i < _numAggregationFunctions; i++) {
-            values[_numGroupByExpressions + i] = 
aggregationGroupByResult.getResultForGroupId(i, groupId);
+      // Iterate over the group-by keys, for each key, update the group-by 
result in the indexedTable
+      Collection<IntermediateRecord> intermediateRecords = 
intermediateResultsBlock.getIntermediateRecords();
+      // For now, only GroupBy OrderBy query has pre-constructed intermediate 
records
+      if (intermediateRecords == null) {
+        // Merge aggregation group-by result.
+        AggregationGroupByResult aggregationGroupByResult = 
intermediateResultsBlock.getAggregationGroupByResult();
+        if (aggregationGroupByResult != null) {
+          // Iterate over the group-by keys, for each key, update the group-by 
result in the indexedTable
+          Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = 
aggregationGroupByResult.getGroupKeyIterator();
+          while (dicGroupKeyIterator.hasNext()) {
+            GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
+            Object[] keys = groupKey._keys;
+            Object[] values = Arrays.copyOf(keys, _numColumns);
+            int groupId = groupKey._groupId;
+            for (int i = 0; i < _numAggregationFunctions; i++) {
+              values[_numGroupByExpressions + i] = 
aggregationGroupByResult.getResultForGroupId(i, groupId);
+            }
+            _indexedTable.upsert(new Key(keys), new Record(values));
           }
-          _indexedTable.upsert(new Key(keys), new Record(values));
+        }
+      } else {
+        for (IntermediateRecord intermediateResult : intermediateRecords) {
+          //TODO: change upsert api so that it accepts intermediateRecord 
directly
+          _indexedTable.upsert(intermediateResult._key, 
intermediateResult._record);
         }
       }
     } catch (EarlyTerminationException e) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 883d718..ebc6947 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.core.operator.query;
 
+import java.util.Collection;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -28,8 +31,11 @@ import 
org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
 
+import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity;
+
 
 /**
  * The <code>AggregationGroupByOrderByOperator</code> class provides the 
operator for aggregation group-by query on a
@@ -43,16 +49,19 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
   private final ExpressionContext[] _groupByExpressions;
   private final int _maxInitialResultHolderCapacity;
   private final int _numGroupsLimit;
+  private final int _minSegmentTrimSize;
   private final TransformOperator _transformOperator;
   private final long _numTotalDocs;
   private final boolean _useStarTree;
   private final DataSchema _dataSchema;
+  private final QueryContext _queryContext;
 
   private int _numDocsScanned = 0;
 
   public AggregationGroupByOrderByOperator(AggregationFunction[] 
aggregationFunctions,
       ExpressionContext[] groupByExpressions, int 
maxInitialResultHolderCapacity, int numGroupsLimit,
-      TransformOperator transformOperator, long numTotalDocs, boolean 
useStarTree) {
+      int minSegmentTrimSize, TransformOperator transformOperator, long 
numTotalDocs, QueryContext queryContext,
+      boolean useStarTree) {
     _aggregationFunctions = aggregationFunctions;
     _groupByExpressions = groupByExpressions;
     _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
@@ -60,6 +69,8 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
     _transformOperator = transformOperator;
     _numTotalDocs = numTotalDocs;
     _useStarTree = useStarTree;
+    _queryContext = queryContext;
+    _minSegmentTrimSize = minSegmentTrimSize;
 
     // NOTE: The indexedTable expects that the the data schema will have group 
by columns before aggregation columns
     int numGroupByExpressions = groupByExpressions.length;
@@ -106,8 +117,20 @@ public class AggregationGroupByOrderByOperator extends 
BaseOperator<Intermediate
       groupByExecutor.process(transformBlock);
     }
 
-    // Build intermediate result block based on aggregation group-by result 
from the executor
-    return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    // There is no OrderBy or minSegmentTrimSize is set to be negative or 0
+    if (_queryContext.getOrderByExpressions() == null || _minSegmentTrimSize 
<= 0) {
+      // Build intermediate result block based on aggregation group-by result 
from the executor
+      return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    }
+    int trimSize = getTableCapacity(_queryContext.getLimit(), 
_minSegmentTrimSize);
+    // Num of groups hasn't reached the threshold
+    if (groupByExecutor.getNumGroups() <= trimSize) {
+      return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    }
+    // Trim
+    TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext);
+    Collection<IntermediateRecord> intermediateRecords = 
groupByExecutor.trimGroupByResult(trimSize, tableResizer);
+    return new IntermediateResultsBlock(_aggregationFunctions, 
intermediateRecords, _dataSchema);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index a392948..915eb5e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -33,7 +33,6 @@ import org.apache.pinot.segment.spi.IndexSegment;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
-
 /**
  * The <code>AggregationGroupByOrderByPlanNode</code> class provides the 
execution plan for aggregation group-by order-by query on a
  * single segment.
@@ -43,13 +42,15 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
   private final IndexSegment _indexSegment;
   private final int _maxInitialResultHolderCapacity;
   private final int _numGroupsLimit;
+  private final int _minSegmentTrimSize;
   private final AggregationFunction[] _aggregationFunctions;
   private final ExpressionContext[] _groupByExpressions;
   private final TransformPlanNode _transformPlanNode;
   private final StarTreeTransformPlanNode _starTreeTransformPlanNode;
+  private final QueryContext _queryContext;
 
   public AggregationGroupByOrderByPlanNode(IndexSegment indexSegment, 
QueryContext queryContext,
-      int maxInitialResultHolderCapacity, int numGroupsLimit) {
+      int maxInitialResultHolderCapacity, int numGroupsLimit, int 
minSegmentTrimSize) {
     _indexSegment = indexSegment;
     _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
     _numGroupsLimit = numGroupsLimit;
@@ -58,6 +59,8 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
     List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
     assert groupByExpressions != null;
     _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]);
+    _queryContext = queryContext;
+    _minSegmentTrimSize = minSegmentTrimSize;
 
     List<StarTreeV2> starTrees = indexSegment.getStarTrees();
     if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) {
@@ -95,11 +98,13 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
     if (_transformPlanNode != null) {
       // Do not use star-tree
       return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_transformPlanNode.run(), numTotalDocs, false);
+          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
+          _queryContext, false);
     } else {
       // Use star-tree
       return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_starTreeTransformPlanNode.run(), numTotalDocs, true);
+          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _starTreeTransformPlanNode.run(),
+          numTotalDocs, _queryContext, true);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 4c801e3..3f94d80 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.core.util.QueryOptions;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -56,28 +57,32 @@ import org.slf4j.LoggerFactory;
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
+  public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = 
"min.segment.group.trim.size";
+  public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1;
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private final int _numGroupsLimit;
   // Used for SQL GROUP BY (server combine)
   private final int _groupByTrimThreshold;
+  private final int _minSegmentGroupTrimSize;
 
   @VisibleForTesting
   public InstancePlanMakerImplV2() {
     _maxInitialResultHolderCapacity = 
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
     _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
     _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+    _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
   }
 
   @VisibleForTesting
@@ -85,6 +90,15 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
     _numGroupsLimit = numGroupsLimit;
     _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+    _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
+  }
+
+  @VisibleForTesting
+  public InstancePlanMakerImplV2(int minSegmentGroupTrimSize) {
+    _maxInitialResultHolderCapacity = 
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
+    _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
+    _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
+    _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
   }
 
   /**
@@ -105,8 +119,22 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     Preconditions.checkState(_maxInitialResultHolderCapacity <= 
_numGroupsLimit,
         "Invalid configuration: maxInitialResultHolderCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _maxInitialResultHolderCapacity, _numGroupsLimit);
-    LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: 
{}, numGroupsLimit: {}",
-        _maxInitialResultHolderCapacity, _numGroupsLimit);
+    boolean enableSegmentGroupTrim =
+        queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, 
DEFAULT_ENABLE_SEGMENT_GROUP_TRIM);
+    int minSegmentGroupTrimSize =
+        
queryExecutorConfig.getConfig().getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE, 
DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
+
+    if (minSegmentGroupTrimSize > 0) {
+      _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
+    } else if (enableSegmentGroupTrim) {
+      _minSegmentGroupTrimSize = GroupByUtils.DEFAULT_MIN_NUM_GROUPS;
+    } else {
+      _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
+    }
+    LOGGER.info(
+        "Initializing plan maker with maxInitialResultHolderCapacity: {}, 
numGroupsLimit: {}, enableSegmentTrim: {}, minSegmentGroupTrimSize: {}",
+        _maxInitialResultHolderCapacity, _numGroupsLimit, 
minSegmentGroupTrimSize > 0 || enableSegmentGroupTrim,
+        _minSegmentGroupTrimSize);
   }
 
   @Override
@@ -138,7 +166,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
         if (queryOptions.isGroupByModeSQL()) {
           return new AggregationGroupByOrderByPlanNode(indexSegment, 
queryContext, _maxInitialResultHolderCapacity,
-              _numGroupsLimit);
+              _numGroupsLimit, _minSegmentGroupTrimSize);
         }
         return new AggregationGroupByPlanNode(indexSegment, queryContext, 
_maxInitialResultHolderCapacity,
             _numGroupsLimit);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index d67b4f2..de9cfa9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby;
 
+import java.util.Collection;
 import java.util.Map;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
@@ -144,4 +147,14 @@ public class DefaultGroupByExecutor implements 
GroupByExecutor {
   public AggregationGroupByResult getResult() {
     return new AggregationGroupByResult(_groupKeyGenerator, 
_aggregationFunctions, _groupByResultHolders);
   }
+
+  @Override
+  public int getNumGroups() {
+    return _groupKeyGenerator.getNumKeys();
+  }
+
+  @Override
+  public Collection<IntermediateRecord> trimGroupByResult(int trimSize, 
TableResizer tableResizer) {
+    return 
tableResizer.trimInSegmentResults(_groupKeyGenerator.getGroupKeys(), 
_groupByResultHolders, trimSize);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index 79b7afe..3c63a0d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -206,6 +206,9 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
     return _rawKeyHolder.getStringGroupKeys();
   }
 
+  @Override
+  public int getNumKeys() { return _rawKeyHolder.getNumKeys(); }
+
   private interface RawKeyHolder {
 
     /**
@@ -240,11 +243,18 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
      * Returns an iterator of {@link StringGroupKey}. Use this interface to 
iterate through all the group keys.
      */
     Iterator<StringGroupKey> getStringGroupKeys();
+
+    /**
+     * Returns current number of unique keys
+     */
+    int getNumKeys();
+
   }
 
   private class ArrayBasedHolder implements RawKeyHolder {
     // TODO: using bitmap might better
     private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
+    private int _numKeys = 0;
 
     @Override
     public void processSingleValue(int numDocs, int[] outGroupIds) {
@@ -254,6 +264,10 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
           groupId = groupId * _cardinalities[j] + _singleValueDictIds[j][i];
         }
         outGroupIds[i] = groupId;
+        // if the flag is false, then increase the key num
+        if (!_flags[groupId]) {
+          _numKeys++;
+        }
         _flags[groupId] = true;
       }
     }
@@ -263,6 +277,9 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
       for (int i = 0; i < numDocs; i++) {
         int[] groupIds = getIntRawKeys(i);
         for (int groupId : groupIds) {
+          if (!_flags[groupId]) {
+            _numKeys++;
+          }
           _flags[groupId] = true;
         }
         outGroupIds[i] = groupIds;
@@ -331,6 +348,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
         }
       };
     }
+
+    @Override
+    public int getNumKeys() {
+      return _numKeys;
+    }
   }
 
   private class IntMapBasedHolder implements RawKeyHolder {
@@ -419,6 +441,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
         }
       };
     }
+
+    @Override
+    public int getNumKeys() {
+      return _groupIdMap.size();
+    }
   }
 
   /**
@@ -634,6 +661,10 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
         }
       };
     }
+    @Override
+    public int getNumKeys() {
+      return _groupIdMap.size();
+    }
   }
 
   /**
@@ -836,6 +867,11 @@ public class DictionaryBasedGroupKeyGenerator implements 
GroupKeyGenerator {
         }
       };
     }
+
+    @Override
+    public int getNumKeys() {
+      return _groupIdMap.size();
+    }
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
index c1266c5..869ef5d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby;
 
+import java.util.Collection;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.data.table.TableResizer;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 
 
@@ -40,4 +43,19 @@ public interface GroupByExecutor {
    * @return Result of aggregation
    */
   AggregationGroupByResult getResult();
+
+  /**
+   * Returns the number of generated results
+   *
+   * @return Number of results
+   */
+  int getNumGroups();
+
+  /**
+   * Trim the GroupBy result up to the threshold max(configurable_threshold * 
5, minTrimSize)
+   * TODO: benchmark the performance of PQ vs. topK
+   * <p>Should be called after all transform blocks has been processed.
+   *
+   */
+  Collection<IntermediateRecord> trimGroupByResult(int trimSize, TableResizer 
tableResizer);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
index c9ceaea..592ad19 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
@@ -78,6 +78,11 @@ public interface GroupKeyGenerator {
   Iterator<StringGroupKey> getStringGroupKeys();
 
   /**
+   * Return current number of unique keys
+   */
+  int getNumKeys();
+
+  /**
    * This class encapsulates the integer group id and the group keys.
    */
   class GroupKey {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index d8c7c8e..9f9422a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -337,6 +337,11 @@ public class NoDictionaryMultiColumnGroupKeyGenerator 
implements GroupKeyGenerat
     }
   }
 
+  @Override
+  public int getNumKeys() {
+    return _groupKeyMap.size();
+  }
+
   /**
    * Iterator for {@link GroupKey}.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
index 12b827b..cfee809 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
@@ -200,6 +200,11 @@ public class NoDictionarySingleColumnGroupKeyGenerator 
implements GroupKeyGenera
     }
   }
 
+  @Override
+  public int getNumKeys() {
+    return _groupKeyMap.size();
+  }
+
   private int getKeyForValue(int value) {
     Int2IntMap map = (Int2IntMap) _groupKeyMap;
     int groupId = map.get(value);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index dd29a29..bf0aa03 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -25,7 +25,7 @@ public final class GroupByUtils {
   private GroupByUtils() {
   }
 
-  private static final int NUM_RESULTS_LOWER_LIMIT = 5000;
+  public static final int DEFAULT_MIN_NUM_GROUPS = 5000;
 
   /**
    * (For PQL semantic) Returns the capacity of the table required by the 
given query.
@@ -33,7 +33,16 @@ public final class GroupByUtils {
    *       in PQL semantic.
    */
   public static int getTableCapacity(int limit) {
-    return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
+    return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS);
+  }
+
+  /**
+   * Returns the capacity of the table required by the given query.
+   * NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups 
is configured by the user
+   *      (Default: 5000)
+   */
+  public static int getTableCapacity(int limit, int minNumGroups) {
+    return Math.max(limit * 5, minNumGroups);
   }
 
   /**
@@ -46,7 +55,7 @@ public final class GroupByUtils {
   public static int getTableCapacity(QueryContext queryContext) {
     int limit = queryContext.getLimit();
     if (queryContext.getOrderByExpressions() != null || 
queryContext.getHavingFilter() != null) {
-      return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT);
+      return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS);
     } else {
       return limit;
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index 0f8eb1a..bf85846 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -21,9 +21,15 @@ package org.apache.pinot.core.data.table;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import org.apache.pinot.common.utils.DataSchema;
+import 
org.apache.pinot.core.query.aggregation.groupby.DoubleGroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.segment.local.customobject.AvgPair;
 import org.testng.annotations.BeforeClass;
@@ -43,10 +49,13 @@ public class TableResizerTest {
       new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", 
"distinctcount(m3)", "avg(m4)"},
           new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT});
   private static final int TRIM_TO_SIZE = 3;
+  private static final int NUM_RESULT_HOLDER = 4;
 
   private Map<Key, Record> _recordsMap;
   private List<Record> _records;
   private List<Key> _keys;
+  private List<GroupKeyGenerator.GroupKey> _groupKeys;
+  private GroupByResultHolder[] _groupByResultHolders;
 
   @BeforeClass
   public void setUp() {
@@ -66,6 +75,35 @@ public class TableResizerTest {
         new Key(new Object[]{"c", 300, 5.0})
     );
     //@formatter:on
+    List<Object[]> objectArray = Arrays.asList(
+        new Object[]{"a", 10, 1.0},
+        new Object[]{"b", 10, 2.0},
+        new Object[]{"c", 200, 3.0},
+        new Object[]{"c", 50, 4.0},
+        new Object[]{"c", 300, 5.0});
+
+    // Use _keys for _groupKeys
+    _groupKeys = new LinkedList<>();
+    for (int i = 0; i < _keys.size(); ++i) {
+      GroupKeyGenerator.GroupKey groupKey = new GroupKeyGenerator.GroupKey();
+      groupKey._keys = objectArray.get(i);
+      groupKey._groupId = i;
+      _groupKeys.add(groupKey);
+    }
+
+    // groupByResults are the same as _records
+    _groupByResultHolders = new GroupByResultHolder[NUM_RESULT_HOLDER];
+    _groupByResultHolders[0] = new 
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+    _groupByResultHolders[1] = new 
DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0);
+    _groupByResultHolders[2] = new 
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+    _groupByResultHolders[3] = new 
ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size());
+    for (int i = 0; i < _groupKeys.size(); ++i) {
+      _groupByResultHolders[0].setValueForKey(_groupKeys.get(i)._groupId, 
(double)_records.get(i).getValues()[3]);
+      _groupByResultHolders[1].setValueForKey(_groupKeys.get(i)._groupId, 
(double)_records.get(i).getValues()[4]);
+      _groupByResultHolders[2].setValueForKey(_groupKeys.get(i)._groupId, 
_records.get(i).getValues()[5]);
+      _groupByResultHolders[3].setValueForKey(_groupKeys.get(i)._groupId, 
_records.get(i).getValues()[6]);
+    }
+
     _recordsMap = new HashMap<>();
     int numRecords = _records.size();
     for (int i = 0; i < numRecords; i++) {
@@ -289,4 +327,51 @@ public class TableResizerTest {
     assertEquals(sortedRecords.get(1), _records.get(0));
     assertEquals(sortedRecords.get(2), _records.get(3));
   }
+
+  /**
+   * Tests in-segment trim from 15 records to 10 records
+   */
+  @Test
+  public void testInSegmentTrim() {
+    TableResizer tableResizer =
+        new TableResizer(DATA_SCHEMA, 
QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d3 DESC"));
+    PriorityQueue<IntermediateRecord> results =
+        tableResizer.trimInSegmentResults(_groupKeys.listIterator(), 
_groupByResultHolders, TRIM_TO_SIZE);
+    assertEquals(results.size(), TRIM_TO_SIZE);
+    IntermediateRecord[] resultArray = new IntermediateRecord[results.size()];
+    for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+      IntermediateRecord result = results.poll();
+      resultArray[i] = result;
+    }
+    //  _records[4],  _records[3],  _records[2]
+    assertEquals(resultArray[0]._record, _records.get(2));
+    assertEquals(resultArray[1]._record, _records.get(3));
+    assertEquals(resultArray[2]._record, _records.get(4));
+
+    tableResizer = new TableResizer(DATA_SCHEMA, QueryContextConverterUtils
+        .getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC, 
DISTINCTCOUNT(m3) DESC"));
+    results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(), 
_groupByResultHolders, TRIM_TO_SIZE);
+    assertEquals(results.size(), TRIM_TO_SIZE);
+    for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+      IntermediateRecord result = results.poll();
+      resultArray[i] = result;
+    }
+    // _records[2],  _records[3],  _records[1]
+    assertEquals(resultArray[0]._record, _records.get(1));
+    assertEquals(resultArray[1]._record, _records.get(3));
+    assertEquals(resultArray[2]._record, _records.get(2));
+
+    tableResizer = new TableResizer(DATA_SCHEMA,
+        QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + 
"DISTINCTCOUNT(m3) DESC, AVG(m4) ASC"));
+    results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(), 
_groupByResultHolders, TRIM_TO_SIZE);
+    assertEquals(results.size(), TRIM_TO_SIZE);
+    for (int i = 0; i < TRIM_TO_SIZE; ++i) {
+      IntermediateRecord result = results.poll();
+      resultArray[i] = result;
+    }
+    // _records[2],  _records[3],  _records[1]
+    assertEquals(resultArray[0]._record, _records.get(1));
+    assertEquals(resultArray[1]._record, _records.get(3));
+    assertEquals(resultArray[2]._record, _records.get(4));
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
new file mode 100644
index 0000000..c50ef8e
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.groupby;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.table.IntermediateRecord;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
+import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.Pair;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static java.lang.Math.max;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Unit test for GroupBy Trim functionalities.
+ * - Builds a segment with random data.
+ * - Uses AggregationGroupByOrderByPlanNode class to construct an 
AggregationGroupByOrderByOperator
+ * - Perform aggregationGroupBy and OrderBy on the data
+ * - Also computes those results itself.
+ * - Asserts that the aggregation results returned by the class are the same as
+ *   returned by the local computations.
+ *
+ * Currently tests 'max' functions, and can be easily extended to
+ * test other conditions such as GroupBy without OrderBy
+ */
+public class GroupByInSegmentTrimTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"GroupByInSegmentTrimTest");
+  private static final String SEGMENT_NAME = "TestGroupByInSegment";
+
+  private static final String METRIC_PREFIX = "metric_";
+  private static final int NUM_ROWS = 1000;
+  private static final int NUM_COLUMN = 2;
+  private static final int MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
+  private static final int NUM_GROUPS_LIMIT = 100_000;
+  private static IndexSegment _indexSegment;
+  private static String[] _columns;
+  private static double[][] _inputData;
+  private static Map<Double, Double> _resultMap;
+
+  /**
+   * Initializations prior to the test:
+   * - Build a segment with metric columns (that will be aggregated and 
grouped) containing
+   *  randomly generated data.
+   *
+   * @throws Exception
+   */
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _resultMap = new HashMap<>();
+    // Current Schema: Columns: metrics_0(double), metrics_1(double)
+    _inputData = new double[NUM_COLUMN][NUM_ROWS];
+    _columns = new String[NUM_COLUMN];
+    setupSegment();
+  }
+
+  /**
+   * Test the GroupBy OrderBy query and compute the expected results to match
+   */
+  @Test(dataProvider = "QueryDataProvider")
+  void TestGroupByOrderByOperator(int trimSize, List<Pair<Double, Double>> 
expectedResult, QueryContext queryContext) {
+    // Create a query plan
+    AggregationGroupByOrderByPlanNode aggregationGroupByOrderByPlanNode =
+        new AggregationGroupByOrderByPlanNode(_indexSegment, queryContext, 
MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+            NUM_GROUPS_LIMIT, trimSize);
+
+    // Get the query executor
+    AggregationGroupByOrderByOperator aggregationGroupByOrderByOperator = 
aggregationGroupByOrderByPlanNode.run();
+
+    // Extract the execution result
+    IntermediateResultsBlock resultsBlock = 
aggregationGroupByOrderByOperator.nextBlock();
+    ArrayList<Pair<Double, Double>> extractedResult = 
extractTestResult(resultsBlock);
+
+    assertEquals(extractedResult, expectedResult);
+  }
+
+  /**
+   * Helper method to setup the index segment on which to perform aggregation 
tests.
+   * - Generates a segment with {@link #NUM_COLUMN} and {@link #NUM_ROWS}
+   * - Random 'double' data filled in the metric columns. The data is also 
populated
+   *   into the _inputData[], so it can be used to test the results.
+   *
+   * @throws Exception
+   */
+  private void setupSegment()
+      throws Exception {
+    if (INDEX_DIR.exists()) {
+      FileUtils.deleteQuietly(INDEX_DIR);
+    }
+
+    // Segment Config
+    SegmentGeneratorConfig config =
+        new SegmentGeneratorConfig(new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(),
+            buildSchema());
+    config.setSegmentName(SEGMENT_NAME);
+    config.setOutDir(INDEX_DIR.getAbsolutePath());
+
+    // Fill the data table
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    int baseValue = 10;
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow genericRow = new GenericRow();
+
+      for (int j = 0; j < _columns.length; j++) {
+        String metricName = _columns[j];
+        double value = baseValue + i + j;
+        _inputData[j][i] = value;
+        genericRow.putValue(metricName, value);
+      }
+      // Compute the max result and insert into a grouped map
+      computeMaxResult(_inputData[0][i], _inputData[1][i]);
+      rows.add(genericRow);
+      baseValue += 10;
+    }
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+
+    _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
driver.getSegmentName()), ReadMode.heap);
+  }
+
+  /**
+   * Helper method to build schema for the segment on which aggregation tests 
will be run.
+   *
+   * @return table schema
+   */
+  private Schema buildSchema() {
+    Schema schema = new Schema();
+
+    for (int i = 0; i < NUM_COLUMN; i++) {
+      String metricName = METRIC_PREFIX + i;
+      MetricFieldSpec metricFieldSpec = new MetricFieldSpec(metricName, 
FieldSpec.DataType.DOUBLE);
+      schema.addField(metricFieldSpec);
+      _columns[i] = metricName;
+    }
+    return schema;
+  }
+
+  /**
+   * Helper method to compute the aggregation result grouped by the key
+   *
+   */
+  private void computeMaxResult(Double key, Double result) {
+    if (_resultMap.get(key) == null || _resultMap.get(key) < result) {
+      _resultMap.put(key, result);
+    }
+  }
+
+  /**
+   * Helper method to extract the result from IntermediateResultsBlock
+   *
+   * @return A list of expected results
+   */
+  private ArrayList<Pair<Double, Double>> 
extractTestResult(IntermediateResultsBlock resultsBlock) {
+    AggregationGroupByResult result = 
resultsBlock.getAggregationGroupByResult();
+    if (result != null) {
+      // No trim
+      return extractAggregationResult(result);
+    } else {
+      // In case of trim
+      return extractIntermediateResult(resultsBlock.getIntermediateRecords());
+    }
+  }
+
+  /**
+   * Helper method to extract the result from AggregationGroupByResult
+   *
+   * @return A list of expected results
+   */
+  private ArrayList<Pair<Double, Double>> 
extractAggregationResult(AggregationGroupByResult aggregationGroupByResult) {
+    ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+    Iterator<GroupKeyGenerator.GroupKey> iterator = 
aggregationGroupByResult.getGroupKeyIterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      GroupKeyGenerator.GroupKey groupKey = iterator.next();
+      Double key = (Double) groupKey._keys[0];
+      Double value = (Double) aggregationGroupByResult.getResultForGroupId(i, 
groupKey._groupId);
+      result.add(new Pair<>(key, value));
+    }
+    result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond()));
+    return result;
+  }
+
+  /**
+   * Helper method to extract the result from Collection<IntermediateRecord>
+   *
+   * @return A list of expected results
+   */
+  private ArrayList<Pair<Double, Double>> 
extractIntermediateResult(Collection<IntermediateRecord> intermediateRecord) {
+    ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+    PriorityQueue<IntermediateRecord> resultPQ = new 
PriorityQueue<>(intermediateRecord);
+    while (!resultPQ.isEmpty()) {
+      IntermediateRecord head = resultPQ.poll();
+      result.add(new Pair<>((Double) head._record.getValues()[0], (Double) 
head._record.getValues()[1]));
+    }
+    Collections.reverse(result);
+    return result;
+  }
+
+  @DataProvider
+  public static Object[][] QueryDataProvider() {
+    List<Object[]> data = new ArrayList<>();
+    ArrayList<Pair<Double, Double>> expectedResult = computeExpectedResult();
+    // Testcase1: low limit + high trim size
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContextFromSQL(
+        "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER 
BY max(metric_1) DESC LIMIT 1");
+    int trimSize = 100;
+    int expectedSize = max(trimSize, 5 * queryContext.getLimit());
+    data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
+    // Testcase2: high limit + low trim size
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+        "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER 
BY max(metric_1) DESC LIMIT 50");
+    trimSize = 10;
+    expectedSize = max(trimSize, 5 * queryContext.getLimit());
+    data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
+    // Testcase3: high limit + high trim size (No trim)
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL(
+        "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER 
BY max(metric_1) DESC LIMIT 500");
+    trimSize = 1000;
+    expectedSize = 1000;
+    data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), 
queryContext});
+
+    return data.toArray(new Object[data.size()][]);
+  }
+
+  /**
+   * Helper method to compute the expected result
+   *
+   * @return A list of expected results
+   */
+  private static ArrayList<Pair<Double, Double>> computeExpectedResult() {
+    ArrayList<Pair<Double, Double>> result = new ArrayList<>();
+    for (Map.Entry<Double, Double> entry : _resultMap.entrySet()) {
+      result.add(new Pair<>(entry.getKey(), entry.getValue()));
+    }
+    result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond()));
+    return result;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
index c9ce9db..5dfcead 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -42,6 +43,19 @@ public class InterSegmentOrderByMultiValueQueriesTest 
extends BaseMultiValueQuer
   }
 
   /**
+   * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+   */
+  @Test(dataProvider = "orderByDataProvider")
+  public void testGroupByOrderByMVSegmentTrimSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) 
{
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1);
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, 
planMaker);
+    QueriesTestUtils
+        .testInterSegmentResultTable(brokerResponse, 400000L, 0, 
expectedNumEntriesScannedPostFilter, 400000L,
+            expectedResults, expectedResults.size(), expectedDataSchema);
+  }
+
+  /**
    * Provides various combinations of order by.
    * In order to calculate the expected results, the results from a group by 
were taken, and then ordered accordingly.
    */
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
index a10e542..ddd2d11 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
@@ -30,6 +30,7 @@ import 
org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.testng.Assert;
@@ -73,6 +74,21 @@ public class InterSegmentOrderBySingleValueQueriesTest 
extends BaseSingleValueQu
   }
 
   /**
+   * Tests the in-segment build option for GroupBy OrderBy query. (No trim)
+   */
+  @Test(dataProvider = "orderBySQLResultTableProvider")
+  public void testGroupByOrderByMVSegmentTrimSQLResults(String query, 
List<Object[]> expectedResults,
+      long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
+      long expectedNumTotalDocs, DataSchema expectedDataSchema) {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1);
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, 
planMaker);
+    QueriesTestUtils
+        .testInterSegmentResultTable(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, 
expectedResults, expectedResults.size(),
+            expectedDataSchema);
+  }
+
+  /**
    * Tests the query options for groupByMode, responseFormat.
    * pql, pql - does not execute order by, returns aggregationResults
    * pql, sql - does not execute order by, returns aggregationResults

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to