This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ec7dee  Optimize selection order-by when not all selected expressions 
are ordered (#5661)
2ec7dee is described below

commit 2ec7dee1597021742f68f0ae8b279f7560e55894
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jul 8 07:30:20 2020 -0700

    Optimize selection order-by when not all selected expressions are ordered 
(#5661)
    
    * Optimize selection order-by when not all selected expressions are ordered
    
    * Address comments
---
 .../core/common/RowBasedBlockValueFetcher.java     |   6 +
 .../pinot/core/operator/ProjectionOperator.java    |   9 +-
 .../operator/query/SelectionOrderByOperator.java   | 236 ++++++++++++++++++---
 .../plan/AggregationGroupByOrderByPlanNode.java    |   3 +-
 .../core/plan/AggregationGroupByPlanNode.java      |   3 +-
 .../pinot/core/plan/AggregationPlanNode.java       |   3 +-
 .../apache/pinot/core/plan/ProjectionPlanNode.java |   5 +-
 .../apache/pinot/core/plan/SelectionPlanNode.java  |  33 ++-
 .../apache/pinot/core/plan/TransformPlanNode.java  |  35 +--
 .../BrokerRequestToQueryContextConverter.java      |  19 +-
 .../query/selection/SelectionOperatorUtils.java    |   9 +-
 ...InnerSegmentSelectionMultiValueQueriesTest.java |   6 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  71 ++++++-
 .../queries/SelectionOnlyEarlyTerminationTest.java |   7 +-
 .../DictionaryBasedGroupKeyGeneratorTest.java      |   4 +-
 .../groupby/NoDictionaryGroupKeyGeneratorTest.java |   4 +-
 16 files changed, 364 insertions(+), 89 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
index 31b5805..9faa4c1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/RowBasedBlockValueFetcher.java
@@ -42,6 +42,12 @@ public class RowBasedBlockValueFetcher {
     return row;
   }
 
+  public void getRow(int docId, Object[] buffer, int startIndex) {
+    for (ValueFetcher valueFetcher : _valueFetchers) {
+      buffer[startIndex++] = valueFetcher.getValue(docId);
+    }
+  }
+
   private ValueFetcher createFetcher(BlockValSet blockValSet) {
     DataType valueType = blockValSet.getValueType();
     if (blockValSet.isSingleValue()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
index 00dc5c8..32ef7c8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.operator;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.core.common.Block;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataBlockCache;
 import org.apache.pinot.core.common.DataFetcher;
 import org.apache.pinot.core.common.DataSource;
@@ -37,7 +37,8 @@ public class ProjectionOperator extends 
BaseOperator<ProjectionBlock> {
   private final BaseOperator<DocIdSetBlock> _docIdSetOperator;
   private final DataBlockCache _dataBlockCache;
 
-  public ProjectionOperator(Map<String, DataSource> dataSourceMap, 
BaseOperator<DocIdSetBlock> docIdSetOperator) {
+  public ProjectionOperator(Map<String, DataSource> dataSourceMap,
+      @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator) {
     _dataSourceMap = dataSourceMap;
     _dataSourceMetadataMap = new HashMap<>(dataSourceMap.size());
     for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
@@ -58,6 +59,8 @@ public class ProjectionOperator extends 
BaseOperator<ProjectionBlock> {
 
   @Override
   protected ProjectionBlock getNextBlock() {
+    // NOTE: Should not be called when _docIdSetOperator is null.
+    assert _docIdSetOperator != null;
     DocIdSetBlock docIdSetBlock = _docIdSetOperator.nextBlock();
     if (docIdSetBlock == null) {
       return null;
@@ -74,6 +77,6 @@ public class ProjectionOperator extends 
BaseOperator<ProjectionBlock> {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _docIdSetOperator.getExecutionStatistics();
+    return _docIdSetOperator != null ? 
_docIdSetOperator.getExecutionStatistics() : new ExecutionStatistics(0, 0, 0, 
0);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 2bd0ca6..e13ef48 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -20,72 +20,97 @@ package org.apache.pinot.core.operator.query;
 
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
+import 
org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
+/**
+ * Operator for selection order-by queries.
+ * <p>The operator uses a priority queue to sort the rows and return the top 
rows based on the order-by expressions.
+ * <p>It is optimized to fetch only the values needed for the ordering purpose 
and the final result:
+ * <ul>
+ *   <li>
+ *     When all the output expressions are ordered, the operator fetches all 
the output expressions and insert them into
+ *     the priority queue because all the values are needed for ordering.
+ *   </li>
+ *   <li>
+ *     Otherwise, the operator fetches only the order-by expressions and the 
virtual document id column and insert them
+ *     into the priority queue. After getting the top rows, the operator does 
a second round scan only on the document
+ *     ids for the top rows for the non-order-by output expressions. This 
optimization can significantly reduce the
+ *     scanning and improve the query performance when most/all of the output 
expressions are not ordered (e.g. SELECT *
+ *     FROM table ORDER BY col).
+ *   </li>
+ * </ul>
+ */
 public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBlock> {
   private static final String OPERATOR_NAME = "SelectionOrderByOperator";
 
   private final IndexSegment _indexSegment;
-  private final TransformOperator _transformOperator;
+  // Deduped order-by expressions followed by output expressions from 
SelectionOperatorUtils.extractExpressions()
   private final List<ExpressionContext> _expressions;
-  private final TransformResultMetadata[] _expressionMetadata;
-  private final DataSchema _dataSchema;
+  private final TransformOperator _transformOperator;
+  private final List<OrderByExpressionContext> _orderByExpressions;
+  private final TransformResultMetadata[] _orderByExpressionMetadata;
   private final int _numRowsToKeep;
   private final PriorityQueue<Object[]> _rows;
 
   private int _numDocsScanned = 0;
+  private long _numEntriesScannedPostFilter = 0;
 
   public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext 
queryContext,
       List<ExpressionContext> expressions, TransformOperator 
transformOperator) {
     _indexSegment = indexSegment;
-    _transformOperator = transformOperator;
     _expressions = expressions;
+    _transformOperator = transformOperator;
 
-    int numExpressions = _expressions.size();
-    _expressionMetadata = new TransformResultMetadata[numExpressions];
-    String[] columnNames = new String[numExpressions];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
-    for (int i = 0; i < numExpressions; i++) {
-      ExpressionContext expression = _expressions.get(i);
-      TransformResultMetadata expressionMetadata = 
_transformOperator.getResultMetadata(expression);
-      _expressionMetadata[i] = expressionMetadata;
-      columnNames[i] = expression.toString();
-      columnDataTypes[i] =
-          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    _orderByExpressions = queryContext.getOrderByExpressions();
+    assert _orderByExpressions != null;
+    int numOrderByExpressions = _orderByExpressions.size();
+    _orderByExpressionMetadata = new 
TransformResultMetadata[numOrderByExpressions];
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      ExpressionContext expression = 
_orderByExpressions.get(i).getExpression();
+      _orderByExpressionMetadata[i] = 
_transformOperator.getResultMetadata(expression);
     }
-    _dataSchema = new DataSchema(columnNames, columnDataTypes);
 
-    List<OrderByExpressionContext> orderByExpressions = 
queryContext.getOrderByExpressions();
-    assert orderByExpressions != null;
     _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        getComparator(orderByExpressions));
+        getComparator());
   }
 
-  private Comparator<Object[]> getComparator(List<OrderByExpressionContext> 
orderByExpressions) {
+  private Comparator<Object[]> getComparator() {
     // Compare all single-value columns
-    int numOrderByExpressions = orderByExpressions.size();
+    int numOrderByExpressions = _orderByExpressions.size();
     List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
     for (int i = 0; i < numOrderByExpressions; i++) {
-      if (_expressionMetadata[i].isSingleValue()) {
+      if (_orderByExpressionMetadata[i].isSingleValue()) {
         valueIndexList.add(i);
       }
     }
@@ -98,8 +123,8 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
     for (int i = 0; i < numValuesToCompare; i++) {
       int valueIndex = valueIndexList.get(i);
       valueIndices[i] = valueIndex;
-      dataTypes[i] = _expressionMetadata[valueIndex].getDataType();
-      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
+      dataTypes[i] = _orderByExpressionMetadata[valueIndex].getDataType();
+      multipliers[i] = _orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
     }
 
     return (o1, o2) -> {
@@ -143,24 +168,146 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
+    if (_expressions.size() == _orderByExpressions.size()) {
+      return computeAllOrdered();
+    } else {
+      return computePartiallyOrdered();
+    }
+  }
+
+  /**
+   * Helper method to compute the result when all the output expressions are 
ordered.
+   */
+  private IntermediateResultsBlock computeAllOrdered() {
+    int numExpressions = _expressions.size();
+
+    // Fetch all the expressions and insert them into the priority queue
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      int numExpressions = _expressions.size();
-      BlockValSet[] blockValSets = new BlockValSet[numExpressions];
       for (int i = 0; i < numExpressions; i++) {
         ExpressionContext expression = _expressions.get(i);
         blockValSets[i] = transformBlock.getBlockValueSet(expression);
       }
       RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
-
       int numDocsFetched = transformBlock.getNumDocs();
       _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), 
_rows, _numRowsToKeep);
       }
     }
+    _numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
 
-    return new IntermediateResultsBlock(_dataSchema, _rows);
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+      TransformResultMetadata expressionMetadata = 
_orderByExpressionMetadata[i];
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    return new IntermediateResultsBlock(dataSchema, _rows);
+  }
+
+  /**
+   * Helper method to compute the result when not all the output expressions 
are ordered.
+   */
+  private IntermediateResultsBlock computePartiallyOrdered() {
+    int numExpressions = _expressions.size();
+    int numOrderByExpressions = _orderByExpressions.size();
+
+    // Fetch the order-by expressions and docIds and insert them into the 
priority queue
+    BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions + 1];
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      for (int i = 0; i < numOrderByExpressions; i++) {
+        ExpressionContext expression = 
_orderByExpressions.get(i).getExpression();
+        blockValSets[i] = transformBlock.getBlockValueSet(expression);
+      }
+      blockValSets[numOrderByExpressions] = 
transformBlock.getBlockValueSet(BuiltInVirtualColumn.DOCID);
+      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+      int numDocsFetched = transformBlock.getNumDocs();
+      _numDocsScanned += numDocsFetched;
+      for (int i = 0; i < numDocsFetched; i++) {
+        // NOTE: We pre-allocate the complete row so that we can fill up the 
non-order-by output expression values later
+        //       without creating extra rows or re-constructing the priority 
queue. We can change the values in-place
+        //       because the comparator only compare the values for the 
order-by expressions.
+        Object[] row = new Object[numExpressions];
+        blockValueFetcher.getRow(i, row, 0);
+        SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
+      }
+    }
+
+    // Copy the rows (shallow copy so that any modification will also be 
reflected to the priority queue) into a list,
+    // and store the document ids into a bitmap
+    int numRows = _rows.size();
+    List<Object[]> rowList = new ArrayList<>(numRows);
+    MutableRoaringBitmap docIds = new MutableRoaringBitmap();
+    for (Object[] row : _rows) {
+      rowList.add(row);
+      int docId = (int) row[numOrderByExpressions];
+      docIds.add(docId);
+    }
+
+    // Sort the rows with docIds to match the order of the bitmap (bitmap 
always returns values in ascending order)
+    rowList.sort(Comparator.comparingInt(o -> (int) o[numOrderByExpressions]));
+
+    // Construct a new TransformOperator to fetch the non-order-by expressions 
for the top rows
+    List<ExpressionContext> nonOrderByExpressions = 
_expressions.subList(numOrderByExpressions, numExpressions);
+    Set<String> columns = new HashSet<>();
+    for (ExpressionContext expressionContext : nonOrderByExpressions) {
+      expressionContext.getColumns(columns);
+    }
+    Map<String, DataSource> dataSourceMap = new HashMap<>();
+    for (String column : columns) {
+      dataSourceMap.put(column, _indexSegment.getDataSource(column));
+    }
+    ProjectionOperator projectionOperator =
+        new ProjectionOperator(dataSourceMap, new 
BitmapDocIdSetOperator(docIds, numRows));
+    TransformOperator transformOperator = new 
TransformOperator(projectionOperator, nonOrderByExpressions);
+
+    // Fill the non-order-by expression values
+    int numNonOrderByExpressions = nonOrderByExpressions.size();
+    blockValSets = new BlockValSet[numNonOrderByExpressions];
+    int rowBaseId = 0;
+    while ((transformBlock = transformOperator.nextBlock()) != null) {
+      for (int i = 0; i < numNonOrderByExpressions; i++) {
+        ExpressionContext expression = nonOrderByExpressions.get(i);
+        blockValSets[i] = transformBlock.getBlockValueSet(expression);
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+      int numDocsFetched = transformBlock.getNumDocs();
+      for (int i = 0; i < numDocsFetched; i++) {
+        blockValueFetcher.getRow(i, rowList.get(rowBaseId + i), 
numOrderByExpressions);
+      }
+      rowBaseId += numDocsFetched;
+    }
+    _numEntriesScannedPostFilter =
+        (long) _numDocsScanned * _transformOperator.getNumColumnsProjected() + 
(long) numRows * transformOperator
+            .getNumColumnsProjected();
+
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+    }
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      TransformResultMetadata expressionMetadata = 
_orderByExpressionMetadata[i];
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    for (int i = 0; i < numNonOrderByExpressions; i++) {
+      TransformResultMetadata expressionMetadata = 
transformOperator.getResultMetadata(nonOrderByExpressions.get(i));
+      columnDataTypes[numOrderByExpressions + i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    return new IntermediateResultsBlock(dataSchema, _rows);
   }
 
   @Override
@@ -171,9 +318,38 @@ public class SelectionOrderByOperator extends 
BaseOperator<IntermediateResultsBl
   @Override
   public ExecutionStatistics getExecutionStatistics() {
     long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
-    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
_numEntriesScannedPostFilter,
         numTotalDocs);
   }
+
+  private static class BitmapDocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
+    static final String OPERATOR_NAME = "BitmapDocIdSetOperator";
+
+    final IntIterator _docIdIterator;
+    final int[] _docIdBuffer;
+
+    BitmapDocIdSetOperator(ImmutableRoaringBitmap docIds, int numDocs) {
+      _docIdIterator = docIds.getIntIterator();
+      _docIdBuffer = new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
+    }
+
+    @Override
+    protected DocIdSetBlock getNextBlock() {
+      int numDocIdsFilled = 0;
+      while (numDocIdsFilled < DocIdSetPlanNode.MAX_DOC_PER_CALL && 
_docIdIterator.hasNext()) {
+        _docIdBuffer[numDocIdsFilled++] = _docIdIterator.next();
+      }
+      if (numDocIdsFilled > 0) {
+        return new DocIdSetBlock(_docIdBuffer, numDocIdsFilled);
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public String getOperatorName() {
+      return OPERATOR_NAME;
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
index 5c1b7f5..5e46dae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -93,7 +93,8 @@ public class AggregationGroupByOrderByPlanNode implements 
PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, 
_groupByExpressions);
-    _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform);
+    _transformPlanNode =
+        new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _starTreeTransformPlanNode = null;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
index 77f4ad4..2f1b08e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java
@@ -93,7 +93,8 @@ public class AggregationGroupByPlanNode implements PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, 
_groupByExpressions);
-    _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform);
+    _transformPlanNode =
+        new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _starTreeTransformPlanNode = null;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index c25261c..0255fca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -83,7 +83,8 @@ public class AggregationPlanNode implements PlanNode {
 
     Set<ExpressionContext> expressionsToTransform =
         
AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, 
null);
-    _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform);
+    _transformPlanNode =
+        new TransformPlanNode(_indexSegment, queryContext, 
expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _starTreeTransformPlanNode = null;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
index 4bc7d28..3dad21d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectionPlanNode.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.plan;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.ProjectionOperator;
@@ -36,7 +37,7 @@ public class ProjectionPlanNode implements PlanNode {
   private final DocIdSetPlanNode _docIdSetPlanNode;
 
   public ProjectionPlanNode(IndexSegment indexSegment, Set<String> 
projectionColumns,
-      DocIdSetPlanNode docIdSetPlanNode) {
+      @Nullable DocIdSetPlanNode docIdSetPlanNode) {
     _indexSegment = indexSegment;
     _projectionColumns = projectionColumns;
     _docIdSetPlanNode = docIdSetPlanNode;
@@ -48,6 +49,6 @@ public class ProjectionPlanNode implements PlanNode {
     for (String column : _projectionColumns) {
       dataSourceMap.put(column, _indexSegment.getDataSource(column));
     }
-    return new ProjectionOperator(dataSourceMap, _docIdSetPlanNode.run());
+    return new ProjectionOperator(dataSourceMap, _docIdSetPlanNode != null ? 
_docIdSetPlanNode.run() : null);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index 48fedce..f2f743d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.plan;
 
+import java.util.ArrayList;
 import java.util.List;
+import 
org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -27,6 +29,7 @@ import 
org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 
@@ -44,7 +47,35 @@ public class SelectionPlanNode implements PlanNode {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _expressions = SelectionOperatorUtils.extractExpressions(queryContext, 
indexSegment);
-    _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
_expressions);
+    int limit = queryContext.getLimit();
+    if (limit > 0) {
+      List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+      if (orderByExpressions == null) {
+        // Selection only
+        _transformPlanNode = new TransformPlanNode(_indexSegment, 
queryContext, _expressions,
+            Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL));
+      } else {
+        // Selection order-by
+        if (orderByExpressions.size() == _expressions.size()) {
+          // All output expressions are ordered
+          _transformPlanNode =
+              new TransformPlanNode(_indexSegment, queryContext, _expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+        } else {
+          // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
+          // unnecessary data fetch
+          List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(orderByExpressions.size() + 1);
+          for (OrderByExpressionContext orderByExpression : 
orderByExpressions) {
+            expressionsToTransform.add(orderByExpression.getExpression());
+          }
+          
expressionsToTransform.add(ExpressionContext.forIdentifier(BuiltInVirtualColumn.DOCID));
+          _transformPlanNode = new TransformPlanNode(_indexSegment, 
queryContext, expressionsToTransform,
+              DocIdSetPlanNode.MAX_DOC_PER_CALL);
+        }
+      }
+    } else {
+      // Empty selection (LIMIT 0)
+      _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
_expressions, 0);
+    }
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index c3ee5e5..d69d53e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -25,7 +25,6 @@ import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 
 
 /**
@@ -36,40 +35,16 @@ public class TransformPlanNode implements PlanNode {
   private final ProjectionPlanNode _projectionPlanNode;
 
   public TransformPlanNode(IndexSegment indexSegment, QueryContext 
queryContext,
-      Collection<ExpressionContext> expressions) {
+      Collection<ExpressionContext> expressions, int maxDocsPerCall) {
     _expressions = expressions;
     Set<String> projectionColumns = new HashSet<>();
     for (ExpressionContext expression : expressions) {
       expression.getColumns(projectionColumns);
     }
-    _projectionPlanNode = new ProjectionPlanNode(indexSegment, 
projectionColumns,
-        new DocIdSetPlanNode(indexSegment, queryContext, 
getMaxDocsPerCall(queryContext)));
-  }
-
-  /**
-   * Helper method to get the max number of documents returned in each block.
-   */
-  private int getMaxDocsPerCall(QueryContext queryContext) {
-    if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      // Aggregation query
-      return DocIdSetPlanNode.MAX_DOC_PER_CALL;
-    } else {
-      // Selection query
-      int limit = queryContext.getLimit();
-      if (limit > 0) {
-        if (queryContext.getOrderByExpressions() == null) {
-          // For selection-only queries, select minimum number of documents
-          return Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);
-        } else {
-          // Selection order-by query
-          return DocIdSetPlanNode.MAX_DOC_PER_CALL;
-        }
-      } else {
-        // For LIMIT 0 queries, fetch at least 1 document per 
DocIdSetPlanNode's requirement
-        // TODO: Skip the filtering phase and document fetching for LIMIT 0 
case
-        return 1;
-      }
-    }
+    // NOTE: Skip creating DocIdSetPlanNode when maxDocsPerCall is 0 (for 
selection query with LIMIT 0).
+    DocIdSetPlanNode docIdSetPlanNode =
+        maxDocsPerCall > 0 ? new DocIdSetPlanNode(indexSegment, queryContext, 
maxDocsPerCall) : null;
+    _projectionPlanNode = new ProjectionPlanNode(indexSegment, 
projectionColumns, docIdSetPlanNode);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index 0883795..8a2ef7e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.query.request.context.utils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.AggregationInfo;
@@ -150,23 +152,30 @@ public class BrokerRequestToQueryContextConverter {
     if (pinotQuery != null) {
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (CollectionUtils.isNotEmpty(orderByList)) {
+        // Deduplicate the order-by expressions
         orderByExpressions = new ArrayList<>(orderByList.size());
+        Set<ExpressionContext> expressionSet = new HashSet<>();
         for (Expression orderBy : orderByList) {
           // NOTE: Order-by is always a Function with the ordering of the 
Expression
           Function thriftFunction = orderBy.getFunctionCall();
-          boolean isAsc = thriftFunction.getOperator().equalsIgnoreCase("ASC");
           ExpressionContext expression = 
QueryContextConverterUtils.getExpression(thriftFunction.getOperands().get(0));
-          orderByExpressions.add(new OrderByExpressionContext(expression, 
isAsc));
+          if (expressionSet.add(expression)) {
+            boolean isAsc = 
thriftFunction.getOperator().equalsIgnoreCase("ASC");
+            orderByExpressions.add(new OrderByExpressionContext(expression, 
isAsc));
+          }
         }
       }
     } else {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       if (CollectionUtils.isNotEmpty(orderBy)) {
+        // Deduplicate the order-by expressions
         orderByExpressions = new ArrayList<>(orderBy.size());
+        Set<ExpressionContext> expressionSet = new HashSet<>();
         for (SelectionSort selectionSort : orderBy) {
-          orderByExpressions.add(
-              new 
OrderByExpressionContext(QueryContextConverterUtils.getExpression(selectionSort.getColumn()),
-                  selectionSort.isIsAsc()));
+          ExpressionContext expression = 
QueryContextConverterUtils.getExpression(selectionSort.getColumn());
+          if (expressionSet.add(expression)) {
+            orderByExpressions.add(new OrderByExpressionContext(expression, 
selectionSort.isIsAsc()));
+          }
         }
       }
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 772ffce..99d8f8b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -98,12 +98,13 @@ public class SelectionOperatorUtils {
 
     List<OrderByExpressionContext> orderByExpressions = 
queryContext.getOrderByExpressions();
     if (orderByExpressions != null && queryContext.getLimit() > 0) {
-      // NOTE: Order-by expressions are ignored for queries with LIMIT 0.
+      // NOTE:
+      //   1. Order-by expressions are ignored for queries with LIMIT 0.
+      //   2. Order-by expressions are already deduped in QueryContext.
       for (OrderByExpressionContext orderByExpression : orderByExpressions) {
         ExpressionContext expression = orderByExpression.getExpression();
-        if (expressionSet.add(expression)) {
-          expressions.add(expression);
-        }
+        expressionSet.add(expression);
+        expressions.add(expression);
       }
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 1589cc3..8f11ee9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -200,7 +200,8 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 100000L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
0L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
400000L);
+    // 100000 * (2 order-by columns + 1 docId column) + 10 * (2 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
300020L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 100000L);
     DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = 
computeColumnNameToIndexMap(selectionDataSchema);
@@ -225,7 +226,8 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 15620L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
275416L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
62480L);
+    // 15620 * (2 order-by columns + 1 docId column) + 10 * (2 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
46880L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 100000L);
     selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 7294527..76e22dd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -199,7 +199,8 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
0L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
120000L);
+    // 30000 * (2 order-by columns + 1 docId column) + 10 * (2 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
90020L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = 
computeColumnNameToIndexMap(selectionDataSchema);
@@ -224,7 +225,8 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
84134L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
24516L);
+    // 6129 * (2 order-by columns + 1 docId column) + 10 * (2 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
18407L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
     selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
@@ -254,7 +256,8 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
0L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
330000L);
+    // 30000 * (2 order-by columns + 1 docId column) + 10 * (9 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
90090L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = 
computeColumnNameToIndexMap(selectionDataSchema);
@@ -280,7 +283,8 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
84134L);
-    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
67419);
+    // 6129 * (2 order-by columns + 1 docId column) + 10 * (9 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
18477L);
     Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
     selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
@@ -301,6 +305,65 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals(((Integer) 
lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
   }
 
+  @Test
+  public void testSelectStarOrderByLargeOffsetLimit() {
+    String query = "SELECT * " + " FROM testTable" + ORDER_BY + " LIMIT 5000, 
7000";
+
+    // Test query without filter
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForQuery(query);
+    IntermediateResultsBlock resultsBlock = 
selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
+    Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
0L);
+    // 30000 * (2 order-by columns + 1 docId column) + 12000 * (9 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
198000L);
+    Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
+    Map<String, Integer> columnIndexMap = 
computeColumnNameToIndexMap(selectionDataSchema);
+
+    Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
+    Assert.assertEquals(selectionDataSchema.size(), 11);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
+        DataSchema.ColumnDataType.INT);
+    
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
+        DataSchema.ColumnDataType.INT);
+    PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getSelectionResult();
+    Assert.assertEquals(selectionResult.size(), 12000);
+    Object[] lastRow = selectionResult.peek();
+    Assert.assertEquals(lastRow.length, 11);
+    Assert.assertEquals((int) lastRow[columnIndexMap.get("column6")], 
296467636);
+    Assert.assertEquals((int) lastRow[columnIndexMap.get("column1")], 
1715964282);
+
+    // Test query with filter
+    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    resultsBlock = selectionOrderByOperator.nextBlock();
+    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 
84134L);
+    // 6129 * (2 order-by columns + 1 docId column) + 6129 * (9 non-order-by 
columns)
+    Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 
73548L);
+    Assert.assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    selectionDataSchema = resultsBlock.getDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
+    Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
+    Assert.assertEquals(selectionDataSchema.size(), 11);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
+        DataSchema.ColumnDataType.INT);
+    
Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")),
+        DataSchema.ColumnDataType.INT);
+    selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getSelectionResult();
+    Assert.assertEquals(selectionResult.size(), 6129);
+    lastRow = selectionResult.peek();
+    Assert.assertEquals(lastRow.length, 11);
+    Assert.assertEquals((int) lastRow[columnIndexMap.get("column6")], 
499968041);
+    Assert.assertEquals((int) lastRow[columnIndexMap.get("column1")], 
335520083);
+  }
+
   private int getVirtualColumns(DataSchema selectionDataSchema) {
     int virtualCols = 0;
     for (int i = 0; i < selectionDataSchema.size(); ++i) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
index b7df980..bca78a0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -97,7 +97,6 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
   public void testSelectWithOrderByQuery() {
     int numSegmentsPerServer = getNumSegments();
     String query = "SELECT column11, column18, column1 FROM testTable ORDER BY 
column11";
-    int numColumnsInSelection = 3;
     BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
     assertNotNull(brokerResponse.getSelectionResults());
     assertNull(brokerResponse.getResultTable());
@@ -105,8 +104,9 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
     assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer 
* NUM_SERVERS);
     assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
     assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 
non-order-by columns) per segment
     assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
-        brokerResponse.getNumDocsScanned() * numColumnsInSelection);
+        brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * 
NUM_SERVERS);
     assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
 
     brokerResponse = getBrokerResponseForSqlQuery(query);
@@ -116,8 +116,9 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
     assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer 
* NUM_SERVERS);
     assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
     assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 
non-order-by columns) per segment
     assertEquals(brokerResponse.getNumEntriesScannedPostFilter(),
-        brokerResponse.getNumDocsScanned() * numColumnsInSelection);
+        brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * 
NUM_SERVERS);
     assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * 
NUM_SERVERS * NUM_DOCS_PER_SEGMENT);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
index 2685d31..b4859c6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/DictionaryBasedGroupKeyGeneratorTest.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.plan.TransformPlanNode;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import 
org.apache.pinot.core.query.aggregation.groupby.DictionaryBasedGroupKeyGenerator;
@@ -154,7 +155,8 @@ public class DictionaryBasedGroupKeyGeneratorTest {
     for (String column : MV_COLUMNS) {
       expressions.add(ExpressionContext.forIdentifier(column));
     }
-    TransformPlanNode transformPlanNode = new TransformPlanNode(indexSegment, 
queryContext, expressions);
+    TransformPlanNode transformPlanNode =
+        new TransformPlanNode(indexSegment, queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _transformOperator = transformPlanNode.run();
     _transformBlock = _transformOperator.nextBlock();
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
index b561320..16b7046 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.plan.TransformPlanNode;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
@@ -99,7 +100,8 @@ public class NoDictionaryGroupKeyGeneratorTest {
     for (String column : COLUMN_NAMES) {
       expressions.add(ExpressionContext.forIdentifier(column));
     }
-    TransformPlanNode transformPlanNode = new TransformPlanNode(indexSegment, 
queryContext, expressions);
+    TransformPlanNode transformPlanNode =
+        new TransformPlanNode(indexSegment, queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _transformOperator = transformPlanNode.run();
     _transformBlock = _transformOperator.nextBlock();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to