Jackie-Jiang commented on a change in pull request #7113:
URL: https://github.com/apache/incubator-pinot/pull/7113#discussion_r665779622



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -97,14 +116,38 @@ public AggregationGroupByOrderByOperator run() {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     if (_transformPlanNode != null) {
       // Do not use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
-          _queryContext, false);
+      return new AggregationGroupByOrderByOperator(_indexSegment, 
_aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), 
_maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), 
numTotalDocs, _queryContext,
+          _enableGroupByOpt, false);
     } else {
       // Use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _starTreeTransformPlanNode.run(),
-          numTotalDocs, _queryContext, true);
+      return new AggregationGroupByOrderByOperator(_indexSegment, 
_aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), 
_maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, 
_starTreeTransformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, true);
+    }
+  }
+
+  private boolean checkOrderByOptimization() {
+    if (_queryContext.getHavingFilter() != null) {
+      return false;
+    }
+    Set<ExpressionContext> orderByExpressionsSet = new HashSet<>();
+    // Filter out function expressions
+    for (OrderByExpressionContext orderByExpressionContext : 
_orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      if (expression.getType() == ExpressionContext.Type.FUNCTION) {

Review comment:
       We should only skip `AGGREGATION` function, not `TRANSFORM` function

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -95,6 +96,8 @@ private static QueryContext convertSQL(BrokerRequest 
brokerRequest) {
       for (Expression thriftExpression : groupByList) {
         
groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression));
       }
+      
pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.GROUP_BY_MODE,

Review comment:
       This should not be required because we have this check on the broker 
side to ensure the SQL query always have this option set

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -87,6 +101,11 @@ public AggregationGroupByOrderByPlanNode(IndexSegment 
indexSegment, QueryContext
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, 
_groupByExpressions);
+    _enableGroupByOpt = checkOrderByOptimization();

Review comment:
       Seems most of the checks for enabling the optimization rely on the 
column metadata, and has to happen in the operator. Let's move the whole check 
inside then. Splitting the checks in 2 places is hard to manage

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -97,14 +116,38 @@ public AggregationGroupByOrderByOperator run() {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     if (_transformPlanNode != null) {
       // Do not use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
-          _queryContext, false);
+      return new AggregationGroupByOrderByOperator(_indexSegment, 
_aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), 
_maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), 
numTotalDocs, _queryContext,
+          _enableGroupByOpt, false);
     } else {
       // Use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, 
_groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, 
_minSegmentTrimSize, _starTreeTransformPlanNode.run(),
-          numTotalDocs, _queryContext, true);
+      return new AggregationGroupByOrderByOperator(_indexSegment, 
_aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), 
_maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, 
_starTreeTransformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, true);
+    }
+  }
+
+  private boolean checkOrderByOptimization() {
+    if (_queryContext.getHavingFilter() != null) {
+      return false;
+    }
+    Set<ExpressionContext> orderByExpressionsSet = new HashSet<>();
+    // Filter out function expressions
+    for (OrderByExpressionContext orderByExpressionContext : 
_orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      if (expression.getType() == ExpressionContext.Type.FUNCTION) {
+        return false;
+      }
+      orderByExpressionsSet.add(expression);
+    }
+    // Add group by expressions to order by expressions
+    for (ExpressionContext groupByExpression: _groupByExpressions) {
+      if (!orderByExpressionsSet.contains(groupByExpression)) {
+        _orderByExpressionContexts.add(new 
OrderByExpressionContext(groupByExpression, true));

Review comment:
       We should not directly modify the query context because it is shared 
among multiple segments, and can cause race condition. Ideally this rewrite 
should be done either on broker side or before the query planning on server 
side.
   For now, we can keep a local copy of the order by expressions list.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -168,4 +198,226 @@ private int calculateMinSegmentTrimSize() {
     }
     return _minSegmentTrimSize;
   }
+
+  private TransformOperator constructTransformOperator() {
+    List<TransformResultMetadata> orderByExpressionMetadataList = new 
ArrayList<>();
+    for (OrderByExpressionContext orderByExpressionContext : 
_orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      TransformResultMetadata orderByExpressionMetadata = 
_transformOperator.getResultMetadata(expression);
+      // Only handle single value column now
+      if (!orderByExpressionMetadata.isSingleValue()) {
+        return _transformOperator;
+      }
+      orderByExpressionMetadataList.add(orderByExpressionMetadata);
+    }
+    return 
constructNewTransformOperator(orderByExpressionMetadataList.toArray(new 
TransformResultMetadata[0]));
+  }
+
+  /**
+   * Two pass approach for orderBy on groupBy columns. Fetch the orderBy 
columns to rank the top results
+   * whose docIds will be used to construct a new transform operator for 
aggregations.
+   */
+  private TransformOperator 
constructNewTransformOperator(TransformResultMetadata[] 
orderByExpressionMetadata) {
+    int numOrderByExpressions = _orderByExpressionContexts.length;
+    HashMap<Key, MutableRoaringBitmap> groupByKeyMap = new HashMap<>();
+    TransformBlock transformBlock;
+
+    Dictionary[] dictionaries = new Dictionary[numOrderByExpressions];
+    boolean[] hasDict = new boolean[numOrderByExpressions];
+    int numNoDict = 0;
+    long cardinalityProduct = 1L;
+    boolean longOverflow = false;
+    // Get dictionaries and calculate cardinalities
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      ExpressionContext expression = 
_orderByExpressionContexts[i].getExpression();
+      hasDict[i] = orderByExpressionMetadata[i].hasDictionary();
+      if (hasDict[i]) {
+        dictionaries[i] = _transformOperator.getDictionary(expression);
+        int cardinality = dictionaries[i].length();
+        if (!longOverflow) {
+          if (cardinalityProduct > Long.MAX_VALUE / cardinality) {
+            longOverflow = true;
+          } else {
+            cardinalityProduct *= cardinality;
+          }
+        }
+      }
+      numNoDict += hasDict[i] ? 0 : 1;
+    }
+    //TODO: Determine reasonable threshold
+    if (!longOverflow && cardinalityProduct < _limit || cardinalityProduct < 
500000) {
+      return _transformOperator;
+    }
+    BlockValSet[] blockValSets = new BlockValSet[numNoDict];
+    PriorityQueue<Object[]> PQ = new PriorityQueue<>(_limit,
+        getComparator(orderByExpressionMetadata, numOrderByExpressions, 
dictionaries, hasDict));
+    int[][] dictionaryIds = new int[numOrderByExpressions - numNoDict][];
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      int numDocsFetched = transformBlock.getNumDocs();
+      int[] docIds = 
transformBlock.getBlockValueSet("$docId").getIntValuesSV();
+      int dictionaryIdsIndex = 0;
+      // For dictionary-based columns, we fetch the dictionary ids. Otherwise 
fetch the actual value
+      for (int i = 0; i < numOrderByExpressions; i++) {
+        ExpressionContext expression = 
_orderByExpressionContexts[i].getExpression();
+        BlockValSet blockValSet = transformBlock.getBlockValueSet(expression);
+        if (hasDict[i]) {
+          dictionaryIds[dictionaryIdsIndex] = blockValSet.getDictionaryIdsSV();
+          dictionaryIdsIndex++;
+        } else {
+          blockValSets[i - dictionaryIdsIndex] = blockValSet;
+        }
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+      // TODO: Add special optimization for all dict condition
+      for (int i = 0; i < numDocsFetched; i++) {
+        int docId = docIds[i];
+        // Generate key based on the dictionary Id/fetched values
+        Object[] keys = new Object[numOrderByExpressions];
+        Object[] row = new Object[numNoDict];
+        if (numNoDict != 0) {
+          blockValueFetcher.getRow(i, row, 0);
+        }
+        dictionaryIdsIndex = 0;
+        for (int j = 0; j < numOrderByExpressions; j++) {
+          if (hasDict[j]) {
+            keys[j] = dictionaryIds[dictionaryIdsIndex][i];
+            dictionaryIdsIndex++;
+          } else {
+            keys[j] = row[j - dictionaryIdsIndex];
+          }
+        }
+        AddToObjectPriorityQueue(keys, docId, PQ, groupByKeyMap);
+      }
+    }
+    // Collect docIds
+    Collection<MutableRoaringBitmap> docIdList = groupByKeyMap.values();
+    int numDocs = 0;
+    MutableRoaringBitmap docIds = new MutableRoaringBitmap();
+    for (MutableRoaringBitmap filteredDocIds : docIdList) {
+      for (Integer docId : filteredDocIds) {
+        docIds.add(docId);
+        numDocs++;
+      }
+    }
+
+    // Make a new transform operator
+    Set<ExpressionContext> expressionsToTransform =
+        
AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, 
_groupByExpressions);
+    Set<String> columns = new HashSet<>();
+    for (ExpressionContext expression : expressionsToTransform) {
+      expression.getColumns(columns);
+    }
+    Map<String, DataSource> dataSourceMap = new HashMap<>();

Review comment:
       The `DataSource` should be retrieved from the `ProjectionOperator` 
instead of the segment because the star-tree won't share the same data source




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to