Jackie-Jiang commented on a change in pull request #6991:
URL: https://github.com/apache/incubator-pinot/pull/6991#discussion_r641733538



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
##########
@@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, 
int length, int function
   public AggregationGroupByResult getResult() {
     return new AggregationGroupByResult(_groupKeyGenerator, 
_aggregationFunctions, _groupByResultHolders);
   }
+
+  @Override
+  public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean 
enableSegmentGroupTrim,
+      int threshold, TableResizer tableResizer) {
+    // Check if a trim is necessary
+    int keyNum = 0;
+    if (_hasMVGroupByExpression) {
+      keyNum = _mvGroupKeys.length;
+    } else {
+      keyNum = _svGroupKeys.length;
+    }

Review comment:
       This part is incorrect. `_svGroupKeys` and `_mvGroupKeys` are just 
buffers and the length is always 10000
   You might want to add a API in `GroupKeyGenerator` to return the number of 
keys

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -249,6 +327,44 @@ private IntermediateRecord getIntermediateRecord(Key key, 
Record record) {
     Comparable extract(Record record);
   }
 
+  /**
+   * Helper class to store a subset of Record fields
+   * IntermediateRecord is derived from a Record
+   * Some of the main properties of an IntermediateRecord are:
+   *
+   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
+   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
+   * 3. Inside the values, the columns should be ordered by the order by 
sequence
+   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * 5. There is an optional field to store the original record. Each segment 
can keep the intermediate result in this
+   * form to prevent constructing IntermediateRecord again in the server.
+   */
+  public static class IntermediateRecord {

Review comment:
       Make this a separate class since it is shared by multiple classes

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
##########
@@ -57,6 +58,7 @@
   private AggregationGroupByResult _aggregationGroupByResult;
   private List<Map<String, Object>> _combinedAggregationGroupByResult;
   private List<ProcessingException> _processingExceptions;
+  private Collection<TableResizer.IntermediateRecord> _intermediateCollection;

Review comment:
       ```suggestion
     private Collection<TableResizer.IntermediateRecord> _intermediateRecords;
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -43,23 +46,29 @@
   private final ExpressionContext[] _groupByExpressions;
   private final int _maxInitialResultHolderCapacity;
   private final int _numGroupsLimit;
+  private final int _inSegmentResultLimit;

Review comment:
       `_inSegmentResultLimit` and `_enableSegmentGroupTrim` can be combined 
into `_trimSize`, and use negative value to denote that trim is off

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -49,6 +53,7 @@
   private final Map<ExpressionContext, Integer> _groupByExpressionIndexMap;
   private final AggregationFunction[] _aggregationFunctions;
   private final Map<FunctionContext, Integer> _aggregationFunctionIndexMap;
+  private final boolean _hasOrderBy;

Review comment:
       We should only use `TableResizer` when the query has order-by. Please 
check it on the caller side before creating the `TableResizer`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -214,23 +230,85 @@ private IntermediateRecord getIntermediateRecord(Key key, 
Record record) {
   }
 
   /**
-   * Helper class to store a subset of Record fields
-   * IntermediateRecord is derived from a Record
-   * Some of the main properties of an IntermediateRecord are:
-   *
-   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
-   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
-   * 3. Inside the values, the columns should be ordered by the order by 
sequence
-   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * Helper class to make an IntermediateRecord with the record.
    */
-  private static class IntermediateRecord {
-    final Key _key;
-    final Comparable[] _values;
+  private IntermediateRecord getInSegmentIntermediateRecord(Key key, Record 
record) {

Review comment:
       Remove this method and directly modify `getIntermediateRecord()`. The 
cached record can be used to simplify line 184: 
`recordsMap.get(recordToRetain._key)` -> `recordToRetain._record`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
##########
@@ -214,23 +230,85 @@ private IntermediateRecord getIntermediateRecord(Key key, 
Record record) {
   }
 
   /**
-   * Helper class to store a subset of Record fields
-   * IntermediateRecord is derived from a Record
-   * Some of the main properties of an IntermediateRecord are:
-   *
-   * 1. Key in IntermediateRecord is expected to be identical to the one in 
the Record
-   * 2. For values, IntermediateRecord should only have the columns needed for 
order by
-   * 3. Inside the values, the columns should be ordered by the order by 
sequence
-   * 4. For order by on aggregations, final results should extracted if the 
intermediate result is non-comparable
+   * Helper class to make an IntermediateRecord with the record.
    */
-  private static class IntermediateRecord {
-    final Key _key;
-    final Comparable[] _values;
+  private IntermediateRecord getInSegmentIntermediateRecord(Key key, Record 
record) {
+    Comparable[] intermediateRecordValues = new 
Comparable[_numOrderByExpressions];
+    for (int i = 0; i < _numOrderByExpressions; i++) {
+      intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
+    }
+    return new IntermediateRecord(key, intermediateRecordValues, record);
+  }
 
-    IntermediateRecord(Key key, Comparable[] values) {
-      _key = key;
-      _values = values;
+  /**
+   * Trim the aggregation results using a priority queue and returns a the 
priority queue.
+   * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
+   * The use case now is Multi-Segment GroupBy OrderBy query.
+   */
+  public PriorityQueue<IntermediateRecord> 
trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator,
+      GroupByResultHolder[] _groupByResultHolders, int size) {
+    if (!groupKeyIterator.hasNext() || _groupByResultHolders.length == 0 || 
size == 0) {
+      return new PriorityQueue<>();
+    }
+    int numAggregationFunctions = _aggregationFunctions.length;
+    int numColumns = numAggregationFunctions + _numGroupByExpressions;
+
+    // Get comparator
+    Comparator<IntermediateRecord> comparator = 
_intermediateRecordComparator.reversed();
+    PriorityQueue<IntermediateRecord> priorityQueue = new 
PriorityQueue<>(size, comparator);
+    while (groupKeyIterator.hasNext()) {
+      // Iterate over keys
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      Object[] keys = groupKey._keys;
+      Object[] values = Arrays.copyOf(keys, numColumns);
+      int groupId = groupKey._groupId;
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        values[_numGroupByExpressions + i] =
+            
_aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i], 
groupId);
+      }
+      // {key, intermediate_record, record}
+      IntermediateRecord intermediateRecord = 
getInSegmentIntermediateRecord(new Key(keys), new Record(values));
+      if (priorityQueue.size() < size) {
+        priorityQueue.offer(intermediateRecord);
+      } else {
+        IntermediateRecord peek = priorityQueue.peek();
+        if (comparator.compare(peek, intermediateRecord) < 0) {
+          priorityQueue.poll();
+          priorityQueue.offer(intermediateRecord);
+        }
+      }
     }
+    return priorityQueue;
+  }
+
+  /**
+   * Build a list of intermediate record and return the list.
+   * This method is to be called from individual segment if the intermediate 
results doesn't need to be trimmed.
+   * The use case now is Multi-Segment GroupBy OrderBy query.
+   */
+  public List<IntermediateRecord> 
buildInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator,

Review comment:
       When trim is off, we should avoid creating the list because it could 
lead to memory issue

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
##########
@@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, 
int length, int function
   public AggregationGroupByResult getResult() {
     return new AggregationGroupByResult(_groupKeyGenerator, 
_aggregationFunctions, _groupByResultHolders);
   }
+
+  @Override
+  public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean 
enableSegmentGroupTrim,
+      int threshold, TableResizer tableResizer) {

Review comment:
       ```suggestion
         int trimSize, TableResizer tableResizer) {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
##########
@@ -117,6 +119,19 @@ public IntermediateResultsBlock(AggregationFunction[] 
aggregationFunctions,
     _dataSchema = dataSchema;
   }
 
+  /**
+   * Constructor for aggregation group-by order-by result with {@link 
AggregationGroupByResult} and
+   * with a collection of intermediate records.
+   */
+  public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions,
+      @Nullable AggregationGroupByResult aggregationGroupByResults,

Review comment:
       Do we need to pass `aggregationGroupByResults` here? Also the 
intermediate records should never be null

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
##########
@@ -144,4 +147,22 @@ protected void aggregate(TransformBlock transformBlock, 
int length, int function
   public AggregationGroupByResult getResult() {
     return new AggregationGroupByResult(_groupKeyGenerator, 
_aggregationFunctions, _groupByResultHolders);
   }
+
+  @Override
+  public Collection<TableResizer.IntermediateRecord> trimGroupByResult(boolean 
enableSegmentGroupTrim,

Review comment:
       Remove `enableSegmentGroupTrim`. When this method is invoked, it should 
always trim the result

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -51,40 +51,56 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity;
+
 
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default 
implementation of {@link PlanMaker}.
  */
 public class InstancePlanMakerImplV2 implements PlanMaker {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
-
   public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = 
"max.init.group.holder.capacity";
   public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
   public static final String NUM_GROUPS_LIMIT = "num.groups.limit";
+  public static final String ENABLE_SEGMENT_GROUP_TRIM = 
"enable.segment.group.trim";
   public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;
-
+  public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false;
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
   private final int _numGroupsLimit;
   // Used for SQL GROUP BY (server combine)
   private final int _groupByTrimThreshold;
+  private final boolean _enableSegmentGroupTrim;
+
+  @VisibleForTesting
+  private int _inSegmentTrimLimit = -1;

Review comment:
       Since we need it for testing purpose, let's directly make it 
configurable as `_minSegmentTrimSize` and default to 5000

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
##########
@@ -281,6 +296,16 @@ public void setNumGroupsLimitReached(boolean 
numGroupsLimitReached) {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  /**
+   * Get an iterator for the intermediate record collection. Should only be 
called if _intermediateCollection is present
+   */
+  public Iterator<TableResizer.IntermediateRecord> 
getIntermediateResultIterator() {

Review comment:
       Directly return the collection instead of the iterator, and annotate it 
with nullable

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -106,8 +116,13 @@ protected IntermediateResultsBlock getNextBlock() {
       groupByExecutor.process(transformBlock);
     }
 
+    if (!_tableResizer.getOrderByStatus()) {
+      return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    }
+    Collection<TableResizer.IntermediateRecord> intermediate =
+        groupByExecutor.trimGroupByResult(_enableSegmentGroupTrim, 
_inSegmentResultLimit, _tableResizer);
     // Build intermediate result block based on aggregation group-by result 
from the executor
-    return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), _dataSchema);
+    return new IntermediateResultsBlock(_aggregationFunctions, 
groupByExecutor.getResult(), intermediate, _dataSchema);

Review comment:
       Fall back to the old behavior when there is no segment trim. Keeping a 
list can increase the memory overhead

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -85,6 +94,7 @@ public 
AggregationGroupByOrderByOperator(AggregationFunction[] aggregationFuncti
     }
 
     _dataSchema = new DataSchema(columnNames, columnDataTypes);
+    _tableResizer = new TableResizer(_dataSchema, queryContext);

Review comment:
       Don't create table resizer here, create it in `getNextBlock()` only when 
needed

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -137,8 +206,15 @@ public PlanNode makeSegmentPlanNode(IndexSegment 
indexSegment, QueryContext quer
         QueryOptions queryOptions = new 
QueryOptions(queryContext.getQueryOptions());
         // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
         if (queryOptions.isGroupByModeSQL()) {
+          // Calculate trim limit = max(limit * 5, 5000)

Review comment:
       Don't calculate it here, move it into the 
`AggregationGroupByOrderByPlanNode`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -105,8 +121,61 @@ public InstancePlanMakerImplV2(QueryExecutorConfig 
queryExecutorConfig) {
     Preconditions.checkState(_maxInitialResultHolderCapacity <= 
_numGroupsLimit,
         "Invalid configuration: maxInitialResultHolderCapacity: %d must be 
smaller or equal to numGroupsLimit: %d",
         _maxInitialResultHolderCapacity, _numGroupsLimit);
-    LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: 
{}, numGroupsLimit: {}",
-        _maxInitialResultHolderCapacity, _numGroupsLimit);
+    _enableSegmentGroupTrim =
+        queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, 
DEFAULT_ENABLE_SEGMENT_GROUP_TRIM);
+    LOGGER.info(
+        "Initializing plan maker with maxInitialResultHolderCapacity: {}, 
numGroupsLimit: {}, enableSegmentTrim: {}",
+        _maxInitialResultHolderCapacity, _numGroupsLimit, 
_enableSegmentGroupTrim);
+  }
+
+  /**
+   * Returns {@code true} if the given aggregation-only without filter 
QueryContext can be solved with segment metadata,
+   * {@code false} otherwise.
+   * <p>Aggregations supported: COUNT
+   */
+  @VisibleForTesting
+  static boolean isFitForMetadataBasedPlan(QueryContext queryContext) {

Review comment:
       Why moving these 2 methods? Seems unrelated

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -43,21 +43,27 @@
   private final IndexSegment _indexSegment;
   private final int _maxInitialResultHolderCapacity;
   private final int _numGroupsLimit;
+  private final boolean _enableSegmentGroupTrim;
+  private final int _inSegmentTrimLimit;

Review comment:
       Change it to `_minSegmentTrimSize` and make it default to 5000, then we 
can pass it to `GroupByUtils` to calculate the actual trim size




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to