This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0811d3e3609 Create a DescDocIdSetOperator and improve order by desc 
when columns are sorted (#16789)
0811d3e3609 is described below

commit 0811d3e3609d8e4f92c2cf13789886b67573554a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Sep 23 16:03:11 2025 +0200

    Create a DescDocIdSetOperator and improve order by desc when columns are 
sorted (#16789)
---
 .../common/utils/config/QueryOptionsUtils.java     |  13 +-
 .../pinot/core/operator/BaseDocIdSetOperator.java  |  64 +++++++
 .../pinot/core/operator/BaseProjectOperator.java   |  12 +-
 .../core/operator/BitmapDocIdSetOperator.java      |  73 ++++++--
 ...ocIdSetBlock.java => DocIdOrderedOperator.java} |  35 ++--
 .../pinot/core/operator/DocIdSetOperator.java      |  18 +-
 ...rator.java => IntIteratorDocIdSetOperator.java} |  53 +++---
 .../pinot/core/operator/ProjectionOperator.java    |  32 +++-
 .../core/operator/ProjectionOperatorUtils.java     |   7 +-
 ...tOperator.java => ReverseDocIdSetOperator.java} |  67 ++++---
 .../pinot/core/operator/blocks/DocIdSetBlock.java  |  11 +-
 .../ExpressionScanDocIdIterator.java               |  25 ++-
 .../operator/query/SelectionOrderByOperator.java   |   4 +-
 .../SelectionPartiallyOrderedByDescOperation.java  |   6 +-
 ...SelectionPartiallyOrderedByLinearOperator.java} |  31 ++--
 .../core/operator/transform/TransformOperator.java |  17 ++
 .../apache/pinot/core/plan/DocIdSetPlanNode.java   |   7 +-
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  17 +-
 .../apache/pinot/core/plan/ProjectPlanNode.java    |  12 +-
 .../apache/pinot/core/plan/SelectionPlanNode.java  |  61 +++---
 .../combine/SelectionCombineOperatorTest.java      |  95 ++++++++--
 .../DefaultAggregationExecutorTest.java            |   3 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java | 205 +++++++++++++++------
 .../queries/TextMatchTransformFunctionTest.java    |   3 +-
 .../apache/pinot/perf/BenchmarkOrderByQueries.java | 167 +++++++----------
 .../forward/FixedBitSVForwardIndexReaderV2.java    |  17 ++
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +-
 .../apache/pinot/tools/SortedColumnQuickstart.java | 187 +++++++++++++++++++
 .../examples/batch/sorted/ingestionJobSpec.yaml    | 124 +++++++++++++
 29 files changed, 1030 insertions(+), 339 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 0ee27b145ed..a205f798daa 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -346,11 +346,6 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
   }
 
-  @Nullable
-  public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
-    return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
-  }
-
   @Nullable
   public static Integer getMultiStageLeafLimit(Map<String, String> 
queryOptions) {
     String maxLeafLimitStr = 
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
@@ -582,4 +577,12 @@ public class QueryOptionsUtils {
   public static String getWorkloadName(Map<String, String> queryOptions) {
     return queryOptions.getOrDefault(QueryOptionKey.WORKLOAD_NAME, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
   }
+
+  public static boolean isReverseOrderAllowed(Map<String, String> 
queryOptions) {
+    String value = queryOptions.get(QueryOptionKey.ALLOW_REVERSE_ORDER);
+    if (value == null) {
+      return QueryOptionKey.DEFAULT_ALLOW_REVERSE_ORDER;
+    }
+    return Boolean.parseBoolean(value);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
new file mode 100644
index 00000000000..cefed0c67d4
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
+
+
+/// The base class for operators that produce [DocIdSetBlock].
+///
+/// These operators are the intermediate between
+/// [filter 
operators][org.apache.pinot.core.operator.filter.BaseFilterOperator] and
+/// [projection operators][org.apache.pinot.core.operator.ProjectionOperator].
+///
+/// Filter operators return a 
[org.apache.pinot.core.operator.blocks.FilterBlock], whose method
+/// 
[getBlockDocIdSet()][org.apache.pinot.core.operator.blocks.FilterBlock#getBlockDocIdSet()]
 is used
+/// to build different types of [BaseDocIdSetOperator]s (e.g. 
[DocIdSetOperator]). Contrary to filter operators,
+/// whose 
[nextBlock()][org.apache.pinot.core.operator.filter.BaseFilterOperator#nextBlock()]
 method returns always
+/// the same block (which contains all the matched document ids for the 
segment),
+/// **DocIdSetOperator[.nextBlock()][BaseDocIdSetOperator#nextBlock()] split 
the segment in multiple blocks and
+/// therefore must be called multiple times until it returns `null`**.
+///
+/// The blocks returned by [BaseDocIdSetOperator] can and usually are returned 
in a given order. Most of the time, the
+/// order is ascending, which means that the blocks and the rows inside them 
are sorted by their document ids in
+/// ascending order, but may be changed depending on the query. For example 
the optimizer may decide to configure
+/// operators to emit rows in descending order to optimize the performance of 
top-N queries on segments sorted in
+/// ascending order.
+public abstract class BaseDocIdSetOperator extends BaseOperator<DocIdSetBlock>
+    implements DocIdOrderedOperator<DocIdSetBlock> {
+
+  /// Returns a [BaseDocIdSetOperator] that is compatible with the requested 
order or fails with
+  /// [IllegalArgumentException] if the order is not supported for this 
operator.
+  ///
+  /// It may return `this` if the order is already the requested one.
+  ///
+  /// @return a [BaseDocIdSetOperator] that is ascending if `ascending` is 
true or descending otherwise. Remember that
+  ///         an operator may be both ascending and descending if it is empty.
+  /// @throws UnsupportedOperationException if the order is not supported for 
this operator.
+  public abstract BaseDocIdSetOperator withOrder(DocIdOrder order) throws 
UnsupportedOperationException;
+
+  /// Returns the next block of document ids that match the filter (if any).
+  ///
+  /// Each time this method is called, it returns the next block of document 
ids, or `null` if there is no more
+  /// blocks.
+  @Override
+  @Nullable
+  protected abstract DocIdSetBlock getNextBlock();
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
index aaaa8cc4155..4c6d96f67e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
@@ -23,7 +23,8 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 
 
-public abstract class BaseProjectOperator<T extends ValueBlock> extends 
BaseOperator<T> {
+public abstract class BaseProjectOperator<T extends ValueBlock> extends 
BaseOperator<T>
+    implements DocIdOrderedOperator<T> {
 
   /**
    * Returns a map from source column name to context.
@@ -41,4 +42,13 @@ public abstract class BaseProjectOperator<T extends 
ValueBlock> extends BaseOper
   public int getNumColumnsProjected() {
     return getSourceColumnContextMap().size();
   }
+
+  /**
+   * Returns an instance of {@link BaseProjectOperator} with the given order.
+   *
+   * The instance can be the same as the current one if requested order is the 
same as the current one.
+   *
+   * @throws UnsupportedOperationException if the order cannot be changed
+   */
+  public abstract BaseProjectOperator<T> withOrder(DocIdOrder newOrder);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
index 4f203599c1e..549eda9e1bf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
@@ -32,42 +32,61 @@ import org.roaringbitmap.IntIterator;
  * <p>Should call {@link #nextBlock()} multiple times until it returns 
<code>null</code> (already exhausts all the
  * documents) or already gathered enough documents (for selection queries).
  */
-public class BitmapDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
-
+public class BitmapDocIdSetOperator extends BaseDocIdSetOperator {
   private static final String EXPLAIN_NAME = "DOC_ID_SET_BITMAP";
 
+  private final ImmutableBitmapDataProvider _docIds;
+  private final int[] _docIdBuffer;
+  private final DocIdOrder _docIdOrder;
+
   // TODO: Consider using BatchIterator to fill the document ids. Currently 
BatchIterator only reads bits for one
   //       container instead of trying to fill up the buffer with bits from 
multiple containers. If in the future
   //       BatchIterator provides an API to fill up the buffer, switch to 
BatchIterator.
-  private final IntIterator _intIterator;
-  private final int[] _docIdBuffer;
+  private IntIterator _docIdIterator;
+
+  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider docIds, int[] 
docIdBuffer, DocIdOrder docIdOrder) {
+    _docIds = docIds;
+    _docIdBuffer = docIdBuffer;
+    _docIdOrder = docIdOrder;
+  }
 
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+  public BitmapDocIdSetOperator(IntIterator docIdIterator, int[] docIdBuffer, 
DocIdOrder docIdOrder) {
+    _docIds = null;
+    _docIdIterator = docIdIterator;
+    _docIdBuffer = docIdBuffer;
+    _docIdOrder = docIdOrder;
   }
 
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int 
numDocs) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
+  public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider 
docIds) {
+    return ascending(docIds, new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
   }
 
-  public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
-    _intIterator = intIterator;
-    _docIdBuffer = docIdBuffer;
+  public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider 
docIds, int numDocs) {
+    return ascending(docIds, new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)]);
   }
 
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[] 
docIdBuffer) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = docIdBuffer;
+  public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider 
docIds, int[] docIdBuffer) {
+    return new BitmapDocIdSetOperator(docIds, docIdBuffer, DocIdOrder.ASC);
+  }
+
+  public static BitmapDocIdSetOperator descending(ImmutableBitmapDataProvider 
docIds, int numDocs) {
+    return descending(docIds, new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)]);
+  }
+
+  public static BitmapDocIdSetOperator descending(ImmutableBitmapDataProvider 
bitmap, int[] docIdBuffer) {
+    return new BitmapDocIdSetOperator(bitmap, docIdBuffer, DocIdOrder.DESC);
   }
 
   @Override
   protected DocIdSetBlock getNextBlock() {
+    if (_docIdIterator == null) {
+      assert _docIds != null;
+      _docIdIterator = _docIdOrder == DocIdOrder.ASC ? 
_docIds.getIntIterator() : _docIds.getReverseIntIterator();
+    }
     int bufferSize = _docIdBuffer.length;
     int index = 0;
-    while (index < bufferSize && _intIterator.hasNext()) {
-      _docIdBuffer[index++] = _intIterator.next();
+    while (index < bufferSize && _docIdIterator.hasNext()) {
+      _docIdBuffer[index++] = _docIdIterator.next();
     }
     if (index > 0) {
       return new DocIdSetBlock(_docIdBuffer, index);
@@ -76,7 +95,6 @@ public class BitmapDocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     }
   }
 
-
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
@@ -86,4 +104,21 @@ public class BitmapDocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
   public List<Operator> getChildOperators() {
     return Collections.emptyList();
   }
+
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return _docIdOrder == order;
+  }
+
+  @Override
+  public BaseDocIdSetOperator withOrder(DocIdOrder order)
+      throws UnsupportedOperationException {
+    if (isCompatibleWith(order)) {
+      return this;
+    }
+    if (_docIds == null) {
+      throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support 
changing its order");
+    }
+    return new BitmapDocIdSetOperator(_docIds, _docIdBuffer, order);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
similarity index 55%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
index d4bb1562769..5ec60a26995 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
@@ -16,30 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator.blocks;
+package org.apache.pinot.core.operator;
 
 import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.common.Operator;
 
 
-/**
- * The {@code DocIdSetBlock} contains a block of document ids (sorted), and is 
returned from {@link DocIdSetOperator}.
- * Each {@code DocIdSetOperator} can return multiple {@code DocIdSetBlock}s.
- */
-public class DocIdSetBlock implements Block {
-  private final int[] _docIds;
-  private final int _length;
+/// An operator that is bound to a specific segment.
+public interface DocIdOrderedOperator<T extends Block> extends Operator<T> {
+  /// Returns true if the operator is ordered by docId in the specified order.
+  ///
+  /// Remember that empty operators or operators that return a single row are 
considered ordered.
+  boolean isCompatibleWith(DocIdOrder order);
 
-  public DocIdSetBlock(int[] docIds, int length) {
-    _docIds = docIds;
-    _length = length;
-  }
-
-  public int[] getDocIds() {
-    return _docIds;
-  }
+  enum DocIdOrder {
+    /// The rows are sorted in strictly ascending docId order.
+    ASC,
+    /// The rows are sorted in strictly descending docId order.
+    DESC;
 
-  public int getLength() {
-    return _length;
+    public static DocIdOrder fromAsc(boolean asc) {
+      return asc ? ASC : DESC;
+    }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index 706de6d4fe4..b8fd1a47380 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -32,11 +32,12 @@ import org.apache.pinot.spi.trace.Tracing;
 
 
 /**
- * The <code>DocIdSetOperator</code> takes a filter operator and returns 
blocks with set of the matched document Ids.
+ * The <code>AscendingDocIdSetOperator</code> takes a filter operator and 
returns blocks with set of the matched
+ * document Ids.
  * <p>Should call {@link #nextBlock()} multiple times until it returns 
<code>null</code> (already exhausts all the
  * matched documents) or already gathered enough documents (for selection 
queries).
  */
-public class DocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+public class DocIdSetOperator extends BaseDocIdSetOperator {
   private static final String EXPLAIN_NAME = "DOC_ID_SET";
 
   private static final ThreadLocal<int[]> THREAD_LOCAL_DOC_IDS =
@@ -106,4 +107,17 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     long numEntriesScannedInFilter = _blockDocIdSet != null ? 
_blockDocIdSet.getNumEntriesScannedInFilter() : 0;
     return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
   }
+
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return order == DocIdOrder.ASC;
+  }
+
+  @Override
+  public BaseDocIdSetOperator withOrder(DocIdOrder order) {
+    if (isCompatibleWith(order)) {
+      return this;
+    }
+    return new ReverseDocIdSetOperator(_filterOperator, _maxSizeOfDocIdSet);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
similarity index 59%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
index 4f203599c1e..4f1da707e50 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
@@ -18,51 +18,46 @@
  */
 package org.apache.pinot.core.operator;
 
-import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
-import org.roaringbitmap.ImmutableBitmapDataProvider;
 import org.roaringbitmap.IntIterator;
 
 
-/**
- * The <code>BitmapDocIdSetOperator</code> takes a bitmap of document ids and 
returns blocks of document ids.
- * <p>Should call {@link #nextBlock()} multiple times until it returns 
<code>null</code> (already exhausts all the
- * documents) or already gathered enough documents (for selection queries).
- */
-public class BitmapDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+public class IntIteratorDocIdSetOperator extends BaseDocIdSetOperator {
 
-  private static final String EXPLAIN_NAME = "DOC_ID_SET_BITMAP";
+  private static final String EXPLAIN_NAME = "DOC_ID_SET_ITERATOR";
 
   // TODO: Consider using BatchIterator to fill the document ids. Currently 
BatchIterator only reads bits for one
   //       container instead of trying to fill up the buffer with bits from 
multiple containers. If in the future
   //       BatchIterator provides an API to fill up the buffer, switch to 
BatchIterator.
   private final IntIterator _intIterator;
   private final int[] _docIdBuffer;
+  private final DocIdOrder _docIdOrder;
 
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-  }
-
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int 
numDocs) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
-  }
-
-  public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
+  public IntIteratorDocIdSetOperator(IntIterator intIterator, int[] 
docIdBuffer, DocIdOrder docIdOrder) {
     _intIterator = intIterator;
     _docIdBuffer = docIdBuffer;
+    _docIdOrder = docIdOrder;
   }
 
-  public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[] 
docIdBuffer) {
-    _intIterator = bitmap.getIntIterator();
-    _docIdBuffer = docIdBuffer;
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return _docIdOrder == order;
   }
 
   @Override
+  public BaseDocIdSetOperator withOrder(DocIdOrder order)
+      throws UnsupportedOperationException {
+    if (_docIdOrder != order) {
+      throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support 
changing its order");
+    }
+    return this;
+  }
+
+  @Override
+  @Nullable
   protected DocIdSetBlock getNextBlock() {
     int bufferSize = _docIdBuffer.length;
     int index = 0;
@@ -76,14 +71,14 @@ public class BitmapDocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     }
   }
 
-
   @Override
-  public String toExplainString() {
-    return EXPLAIN_NAME;
+  public List<? extends Operator> getChildOperators() {
+    return List.of();
   }
 
+  @Nullable
   @Override
-  public List<Operator> getChildOperators() {
-    return Collections.emptyList();
+  public String toExplainString() {
+    return EXPLAIN_NAME;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
index b3e76028dbc..85b07166d27 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
@@ -40,14 +40,14 @@ public class ProjectionOperator extends 
BaseProjectOperator<ProjectionBlock> imp
   protected static final String EXPLAIN_NAME = "PROJECT";
 
   protected final Map<String, DataSource> _dataSourceMap;
-  protected final BaseOperator<DocIdSetBlock> _docIdSetOperator;
+  protected final BaseDocIdSetOperator _docIdSetOperator;
   protected final DataFetcher _dataFetcher;
   protected final DataBlockCache _dataBlockCache;
   protected final Map<String, ColumnContext> _columnContextMap;
   protected final QueryContext _queryContext;
 
   public ProjectionOperator(Map<String, DataSource> dataSourceMap,
-      @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext 
queryContext) {
+      @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext 
queryContext) {
     _dataSourceMap = dataSourceMap;
     _docIdSetOperator = docIdSetOperator;
     _dataFetcher = new DataFetcher(dataSourceMap);
@@ -58,6 +58,17 @@ public class ProjectionOperator extends 
BaseProjectOperator<ProjectionBlock> imp
     _queryContext = queryContext;
   }
 
+  private ProjectionOperator(Map<String, DataSource> dataSourceMap,
+      @Nullable BaseDocIdSetOperator docIdSetOperator, DataFetcher 
dataFetcher, DataBlockCache dataBlockCache,
+      Map<String, ColumnContext> columnContextMap, QueryContext queryContext) {
+    _dataSourceMap = dataSourceMap;
+    _docIdSetOperator = docIdSetOperator;
+    _dataFetcher = dataFetcher;
+    _dataBlockCache = dataBlockCache;
+    _columnContextMap = columnContextMap;
+    _queryContext = queryContext;
+  }
+
   @Override
   public Map<String, ColumnContext> getSourceColumnContextMap() {
     return _columnContextMap;
@@ -93,7 +104,7 @@ public class ProjectionOperator extends 
BaseProjectOperator<ProjectionBlock> imp
   public String toExplainString() {
     StringBuilder stringBuilder = new StringBuilder(EXPLAIN_NAME).append('(');
     // SQL statements such as SELECT 'literal' FROM myTable don't have any 
projection columns.
-    if (!_dataSourceMap.keySet().isEmpty()) {
+    if (!_dataSourceMap.isEmpty()) {
       int count = 0;
       for (String col : _dataSourceMap.keySet()) {
         if (count == _dataSourceMap.keySet().size() - 1) {
@@ -123,6 +134,21 @@ public class ProjectionOperator extends 
BaseProjectOperator<ProjectionBlock> imp
     return _docIdSetOperator != null ? 
_docIdSetOperator.getExecutionStatistics() : new ExecutionStatistics(0, 0, 0, 
0);
   }
 
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return _docIdSetOperator == null || 
_docIdSetOperator.isCompatibleWith(order);
+  }
+
+  @Override
+  public BaseProjectOperator<ProjectionBlock> withOrder(DocIdOrder newOrder) {
+    BaseDocIdSetOperator orderedOperator = 
_docIdSetOperator.withOrder(newOrder);
+    if (orderedOperator == _docIdSetOperator) {
+      return this;
+    }
+    return new ProjectionOperator(_dataSourceMap, orderedOperator, 
_dataFetcher, _dataBlockCache, _columnContextMap,
+        _queryContext);
+  }
+
   @Override
   public void close() {
     _dataBlockCache.close();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
index 151a597a18d..508be8520a6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.operator;
 
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 
@@ -36,7 +35,7 @@ public class ProjectionOperatorUtils {
   }
 
   public static ProjectionOperator getProjectionOperator(Map<String, 
DataSource> dataSourceMap,
-      @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext 
queryContext) {
+      @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext 
queryContext) {
     return _instance.getProjectionOperator(dataSourceMap, docIdSetOperator, 
queryContext);
   }
 
@@ -45,13 +44,13 @@ public class ProjectionOperatorUtils {
      * Returns the projection operator
      */
     ProjectionOperator getProjectionOperator(Map<String, DataSource> 
dataSourceMap,
-        @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext 
queryContext);
+        @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext 
queryContext);
   }
 
   public static class DefaultImplementation implements Implementation {
     @Override
     public ProjectionOperator getProjectionOperator(Map<String, DataSource> 
dataSourceMap,
-        @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext 
queryContext) {
+        @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext 
queryContext) {
       return new ProjectionOperator(dataSourceMap, docIdSetOperator, 
queryContext);
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
similarity index 62%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
index 706de6d4fe4..7ecd45e3a5f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
@@ -19,25 +19,23 @@
 package org.apache.pinot.core.operator;
 
 import com.google.common.base.Preconditions;
-import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
+import org.apache.pinot.core.operator.dociditerators.BitmapBasedDocIdIterator;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.spi.trace.Tracing;
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
 
 
-/**
- * The <code>DocIdSetOperator</code> takes a filter operator and returns 
blocks with set of the matched document Ids.
- * <p>Should call {@link #nextBlock()} multiple times until it returns 
<code>null</code> (already exhausts all the
- * matched documents) or already gathered enough documents (for selection 
queries).
- */
-public class DocIdSetOperator extends BaseOperator<DocIdSetBlock> {
-  private static final String EXPLAIN_NAME = "DOC_ID_SET";
+public class ReverseDocIdSetOperator extends BaseDocIdSetOperator {
+  private static final String EXPLAIN_NAME = "REVERSE_DOC_ID_SET";
 
   private static final ThreadLocal<int[]> THREAD_LOCAL_DOC_IDS =
       ThreadLocal.withInitial(() -> new 
int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
@@ -46,10 +44,10 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
   private final int _maxSizeOfDocIdSet;
 
   private BlockDocIdSet _blockDocIdSet;
-  private BlockDocIdIterator _blockDocIdIterator;
   private int _currentDocId = 0;
+  private IntIterator _reverseIterator;
 
-  public DocIdSetOperator(BaseFilterOperator filterOperator, int 
maxSizeOfDocIdSet) {
+  public ReverseDocIdSetOperator(BaseFilterOperator filterOperator, int 
maxSizeOfDocIdSet) {
     Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <= 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _filterOperator = filterOperator;
     _maxSizeOfDocIdSet = maxSizeOfDocIdSet;
@@ -57,25 +55,20 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
 
   @Override
   protected DocIdSetBlock getNextBlock() {
-    if (_currentDocId == Constants.EOF) {
-      return null;
+    if (_reverseIterator == null) {
+      initializeBitmap();
     }
 
-    // Initialize filter block document Id set
-    if (_blockDocIdSet == null) {
-      _blockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
-      _blockDocIdIterator = _blockDocIdSet.iterator();
+    if (_currentDocId == Constants.EOF) {
+      return null;
     }
 
     Tracing.ThreadAccountantOps.sample();
 
     int pos = 0;
     int[] docIds = THREAD_LOCAL_DOC_IDS.get();
-    for (int i = 0; i < _maxSizeOfDocIdSet; i++) {
-      _currentDocId = _blockDocIdIterator.next();
-      if (_currentDocId == Constants.EOF) {
-        break;
-      }
+    for (int i = 0; i < _maxSizeOfDocIdSet && _reverseIterator.hasNext(); i++) 
{
+      _currentDocId = _reverseIterator.next();
       docIds[pos++] = _currentDocId;
     }
     if (pos > 0) {
@@ -85,6 +78,22 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     }
   }
 
+  private void initializeBitmap() {
+    _blockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
+    BlockDocIdIterator iterator = _blockDocIdSet.iterator();
+    if (iterator instanceof BitmapBasedDocIdIterator) {
+      _reverseIterator = ((BitmapBasedDocIdIterator) 
iterator).getDocIds().getReverseIntIterator();
+    } else {
+      RoaringBitmapWriter<RoaringBitmap> writer = 
RoaringBitmapWriter.writer().get();
+      int docId = iterator.next();
+      while (docId != Constants.EOF) {
+        writer.add(docId);
+        docId = iterator.next();
+      }
+      _reverseIterator = writer.get().getReverseIntIterator();
+    }
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
@@ -92,7 +101,7 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
 
   @Override
   public List<Operator> getChildOperators() {
-    return Collections.singletonList(_filterOperator);
+    return List.of(_filterOperator);
   }
 
   @Override
@@ -106,4 +115,18 @@ public class DocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     long numEntriesScannedInFilter = _blockDocIdSet != null ? 
_blockDocIdSet.getNumEntriesScannedInFilter() : 0;
     return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
   }
+
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return order == DocIdOrder.DESC;
+  }
+
+  @Override
+  public BaseDocIdSetOperator withOrder(DocIdOrder order)
+      throws UnsupportedOperationException {
+    if (isCompatibleWith(order)) {
+      return this;
+    }
+    return new DocIdSetOperator(_filterOperator, _maxSizeOfDocIdSet);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
index d4bb1562769..af31e1520f5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
@@ -19,12 +19,17 @@
 package org.apache.pinot.core.operator.blocks;
 
 import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.BaseDocIdSetOperator;
 
 
 /**
- * The {@code DocIdSetBlock} contains a block of document ids (sorted), and is 
returned from {@link DocIdSetOperator}.
- * Each {@code DocIdSetOperator} can return multiple {@code DocIdSetBlock}s.
+ * The {@code DocIdSetBlock} contains a block of document ids and is returned 
from {@link BaseDocIdSetOperator}.
+ *
+ * Each {@code BaseDocIdSetOperator} can return multiple {@code 
DocIdSetBlock}s and each block contains an array of
+ * document ids.
+ *
+ * Do not confuse this class with {@link 
org.apache.pinot.core.common.BlockDocIdSet}, which is returned by
+ * the filter operator and contains a set of document ids that can be iterated 
through.
  */
 public class DocIdSetBlock implements Block {
   private final int[] _docIds;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 03d16f15e8a..d64bcdc6128 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -25,8 +25,9 @@ import java.util.Map;
 import java.util.OptionalInt;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.BaseDocIdSetOperator;
 import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
 import org.apache.pinot.core.operator.ProjectionOperator;
 import org.apache.pinot.core.operator.ProjectionOperatorUtils;
 import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
@@ -127,8 +128,10 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
   @Override
   public MutableRoaringBitmap applyAnd(BatchIterator batchIterator, 
OptionalInt firstDoc, OptionalInt lastDoc) {
     IntIterator intIterator = batchIterator.asIntIterator(new 
int[OPTIMAL_ITERATOR_BATCH_SIZE]);
+    BaseDocIdSetOperator docIdSetOperator =
+        new BitmapDocIdSetOperator(intIterator, _docIdBuffer, 
DocIdOrderedOperator.DocIdOrder.ASC);
     try (ProjectionOperator projectionOperator = 
ProjectionOperatorUtils.getProjectionOperator(_dataSourceMap,
-        new BitmapDocIdSetOperator(intIterator, _docIdBuffer), _queryContext)) 
{
+        docIdSetOperator, _queryContext)) {
       MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
       ProjectionBlock projectionBlock;
       while ((projectionBlock = projectionOperator.nextBlock()) != null) {
@@ -141,7 +144,7 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
   @Override
   public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
     try (ProjectionOperator projectionOperator = 
ProjectionOperatorUtils.getProjectionOperator(_dataSourceMap,
-        new BitmapDocIdSetOperator(docIds, _docIdBuffer), _queryContext)) {
+        new BitmapDocIdSetOperator(docIds, _docIdBuffer, 
DocIdOrderedOperator.DocIdOrder.ASC), _queryContext)) {
       MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
       ProjectionBlock projectionBlock;
       while ((projectionBlock = projectionOperator.nextBlock()) != null) {
@@ -420,7 +423,7 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
   /**
    * NOTE: This operator contains only one block.
    */
-  private class RangeDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+  private class RangeDocIdSetOperator extends BaseDocIdSetOperator {
     static final String EXPLAIN_NAME = "DOC_ID_SET_RANGE";
 
     DocIdSetBlock _docIdSetBlock;
@@ -449,6 +452,20 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
     public List<Operator> getChildOperators() {
       return Collections.emptyList();
     }
+
+    @Override
+    public boolean isCompatibleWith(DocIdOrder order) {
+      return DocIdOrder.ASC == order;
+    }
+
+    @Override
+    public BaseDocIdSetOperator withOrder(DocIdOrder order)
+        throws UnsupportedOperationException {
+      if (order == DocIdOrder.ASC) {
+        return this;
+      }
+      throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support 
descending order");
+    }
   }
 
   public enum PredicateEvaluationResult {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index d77d4cfaf4b..5db3e914930 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -282,9 +282,9 @@ public class SelectionOrderByOperator extends 
BaseOperator<SelectionResultsBlock
       dataSourceMap.put(column, _indexSegment.getDataSource(column, 
_queryContext.getSchema()));
     }
 
+    BitmapDocIdSetOperator docIdOperator = 
BitmapDocIdSetOperator.ascending(docIds, numRows);
     try (ProjectionOperator projectionOperator =
-        ProjectionOperatorUtils.getProjectionOperator(dataSourceMap, new 
BitmapDocIdSetOperator(docIds, numRows),
-            _queryContext)) {
+        ProjectionOperatorUtils.getProjectionOperator(dataSourceMap, 
docIdOperator, _queryContext)) {
       TransformOperator transformOperator =
           new TransformOperator(_queryContext, projectionOperator, 
nonOrderByExpressions);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
index be5a9b3fb25..fb318d85be3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -48,8 +48,10 @@ public class SelectionPartiallyOrderedByDescOperation 
extends LinearSelectionOrd
     super(indexSegment, queryContext, expressions, projectOperator, 
numSortedExpressions);
     assert queryContext.getOrderByExpressions() != null;
     Preconditions.checkArgument(!queryContext.getOrderByExpressions().stream()
-            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER).findFirst()
-            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers")).isAsc(),
+            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers"))
+            .isAsc(),
         "%s can only be used when the first column in order by is DESC", 
EXPLAIN_NAME);
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
similarity index 66%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
index ff4c1c8654e..eaedac0003c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
@@ -18,37 +18,46 @@
  */
 package org.apache.pinot.core.operator.query;
 
-import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.function.IntFunction;
 import java.util.function.Supplier;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
 /**
- * An operator for order-by queries ASC that are partially sorted over the 
sorting keys.
+ * The operator used when selecting with order-by on a way that the segment 
layout can be used to prune results.
+ *
+ * This requires that the first order-by expression is on a column that is 
sorted in the segment and:
+ * 1. The order-by is ASC
+ * 2. The order-by is DESC and the input operators support descending 
iteration (e.g. full scan)
+ *
  * @see LinearSelectionOrderByOperator
  */
-public class SelectionPartiallyOrderedByAscOperator extends 
LinearSelectionOrderByOperator {
+public class SelectionPartiallyOrderedByLinearOperator extends 
LinearSelectionOrderByOperator {
 
-  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_ASC";
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_LINEAR";
 
   private int _numDocsScanned = 0;
 
-  public SelectionPartiallyOrderedByAscOperator(IndexSegment indexSegment, 
QueryContext queryContext,
+  public SelectionPartiallyOrderedByLinearOperator(IndexSegment indexSegment, 
QueryContext queryContext,
       List<ExpressionContext> expressions, BaseProjectOperator<?> 
projectOperator, int numSortedExpressions) {
     super(indexSegment, queryContext, expressions, projectOperator, 
numSortedExpressions);
-    Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
-            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
-            .findFirst()
-            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers"))
-            .isAsc(),
-        "%s can only be used when the first column in order by is ASC", 
EXPLAIN_NAME);
+    boolean firstColIsAsc = queryContext.getOrderByExpressions().stream()
+        .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers"))
+        .isAsc();
+    DocIdOrderedOperator.DocIdOrder docIdOrder = 
DocIdOrderedOperator.DocIdOrder.fromAsc(firstColIsAsc);
+    if (!projectOperator.isCompatibleWith(docIdOrder)) {
+      throw new IllegalStateException(EXPLAIN_NAME + " requires the input 
operator to be compatible with order: "
+          + docIdOrder + ", but found: " + projectOperator.toExplainString());
+    }
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index d4d003ae038..78418e2fdba 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -60,6 +60,13 @@ public class TransformOperator extends 
BaseProjectOperator<TransformBlock> {
     }
   }
 
+  private TransformOperator(
+      BaseProjectOperator<?> projectOperator,
+      Map<ExpressionContext, TransformFunction> transformFunctionMap) {
+    _projectOperator = projectOperator;
+    _transformFunctionMap = transformFunctionMap;
+  }
+
   @Override
   public Map<String, ColumnContext> getSourceColumnContextMap() {
     return _projectOperator.getSourceColumnContextMap();
@@ -110,4 +117,14 @@ public class TransformOperator extends 
BaseProjectOperator<TransformBlock> {
   public ExecutionStatistics getExecutionStatistics() {
     return _projectOperator.getExecutionStatistics();
   }
+
+  @Override
+  public boolean isCompatibleWith(DocIdOrder order) {
+    return _projectOperator.isCompatibleWith(order);
+  }
+
+  @Override
+  public BaseProjectOperator<TransformBlock> withOrder(DocIdOrder newOrder) {
+    return new TransformOperator(_projectOperator.withOrder(newOrder), 
_transformFunctionMap);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
index 0e3558014b5..1b7fcbef856 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.plan;
 
-import javax.annotation.Nullable;
 import org.apache.pinot.core.operator.DocIdSetOperator;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -33,7 +32,7 @@ public class DocIdSetPlanNode implements PlanNode {
   private final BaseFilterOperator _filterOperator;
 
   public DocIdSetPlanNode(SegmentContext segmentContext, QueryContext 
queryContext, int maxDocPerCall,
-      @Nullable BaseFilterOperator filterOperator) {
+      BaseFilterOperator filterOperator) {
     assert maxDocPerCall > 0 && maxDocPerCall <= MAX_DOC_PER_CALL;
 
     _segmentContext = segmentContext;
@@ -44,8 +43,6 @@ public class DocIdSetPlanNode implements PlanNode {
 
   @Override
   public DocIdSetOperator run() {
-    return new DocIdSetOperator(
-        _filterOperator != null ? _filterOperator : new 
FilterPlanNode(_segmentContext, _queryContext).run(),
-        _maxDocPerCall);
+    return new DocIdSetOperator(_filterOperator, _maxDocPerCall);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 238d83363ed..d7770a0b4a0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -214,10 +214,12 @@ public class FilterPlanNode implements PlanNode {
    * Helper method to build the operator tree from the filter.
    */
   private BaseFilterOperator constructPhysicalOperator(FilterContext filter, 
int numDocs) {
+    List<FilterContext> childFilters;
+    List<BaseFilterOperator> childFilterOperators;
     switch (filter.getType()) {
       case AND:
-        List<FilterContext> childFilters = filter.getChildren();
-        List<BaseFilterOperator> childFilterOperators = new 
ArrayList<>(childFilters.size());
+        childFilters = filter.getChildren();
+        childFilterOperators = new ArrayList<>(childFilters.size());
         for (FilterContext childFilter : childFilters) {
           BaseFilterOperator childFilterOperator = 
constructPhysicalOperator(childFilter, numDocs);
           if (childFilterOperator.isResultEmpty()) {
@@ -266,9 +268,10 @@ public class FilterPlanNode implements PlanNode {
           String column = lhs.getIdentifier();
           DataSource dataSource = _indexSegment.getDataSource(column, 
_queryContext.getSchema());
           PredicateEvaluator predicateEvaluator;
+          TextIndexReader textIndexReader;
           switch (predicate.getType()) {
             case TEXT_CONTAINS:
-              TextIndexReader textIndexReader = dataSource.getTextIndex();
+              textIndexReader = dataSource.getTextIndex();
               if (!(textIndexReader instanceof NativeTextIndexReader)
                   && !(textIndexReader instanceof NativeMutableTextIndex)) {
                 throw new UnsupportedOperationException("TEXT_CONTAINS is 
supported only on native text index");
@@ -340,20 +343,22 @@ public class FilterPlanNode implements PlanNode {
               Preconditions.checkState(vectorIndex != null,
                   "Cannot apply VECTOR_SIMILARITY on column: %s without vector 
index", column);
               return new VectorSimilarityFilterOperator(vectorIndex, 
(VectorSimilarityPredicate) predicate, numDocs);
-            case IS_NULL:
+            case IS_NULL: {
               NullValueVectorReader nullValueVector = 
dataSource.getNullValueVector();
               if (nullValueVector != null) {
                 return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
               } else {
                 return EmptyFilterOperator.getInstance();
               }
-            case IS_NOT_NULL:
-              nullValueVector = dataSource.getNullValueVector();
+            }
+            case IS_NOT_NULL: {
+              NullValueVectorReader nullValueVector = 
dataSource.getNullValueVector();
               if (nullValueVector != null) {
                 return new 
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
               } else {
                 return new MatchAllFilterOperator(numDocs);
               }
+            }
             default:
               predicateEvaluator =
                   PredicateEvaluatorProvider.getPredicateEvaluator(predicate, 
dataSource, _queryContext);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
index a807b22417b..9aa7e1057f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.operator.BaseProjectOperator;
@@ -50,7 +49,7 @@ public class ProjectPlanNode implements PlanNode {
   private final BaseFilterOperator _filterOperator;
 
   public ProjectPlanNode(SegmentContext segmentContext, QueryContext 
queryContext,
-      Collection<ExpressionContext> expressions, int maxDocsPerCall, @Nullable 
BaseFilterOperator filterOperator) {
+      Collection<ExpressionContext> expressions, int maxDocsPerCall, 
BaseFilterOperator filterOperator) {
     _indexSegment = segmentContext.getIndexSegment();
     _segmentContext = segmentContext;
     _queryContext = queryContext;
@@ -61,7 +60,8 @@ public class ProjectPlanNode implements PlanNode {
 
   public ProjectPlanNode(SegmentContext segmentContext, QueryContext 
queryContext,
       Collection<ExpressionContext> expressions, int maxDocsPerCall) {
-    this(segmentContext, queryContext, expressions, maxDocsPerCall, null);
+    this(segmentContext, queryContext, expressions, maxDocsPerCall,
+        new FilterPlanNode(segmentContext, queryContext).run());
   }
 
   @Override
@@ -78,9 +78,9 @@ public class ProjectPlanNode implements PlanNode {
     projectionColumns.forEach(
         column -> dataSourceMap.put(column, 
_indexSegment.getDataSource(column, _queryContext.getSchema())));
     // NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for 
selection query with LIMIT 0)
-    DocIdSetOperator docIdSetOperator =
-        _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_segmentContext, 
_queryContext, _maxDocsPerCall,
-            _filterOperator).run() : null;
+    DocIdSetOperator docIdSetOperator = _maxDocsPerCall > 0
+        ? new DocIdSetPlanNode(_segmentContext, _queryContext, 
_maxDocsPerCall, _filterOperator).run()
+        : null;
 
     // TODO: figure out a way to close this operator, as it may hold reader 
context
     ProjectionOperator projectionOperator =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index d59345e1846..fdb68d9866f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -20,18 +20,18 @@ package org.apache.pinot.core.plan;
 
 import java.util.ArrayList;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
 import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByLinearOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -79,24 +79,26 @@ public class SelectionPlanNode implements PlanNode {
     // Although it is a break of abstraction, some code, specially merging, 
assumes that if there is an order by
     // expression the operator will return a block whose selection result is a 
priority queue.
     int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions, 
_queryContext.isNullHandlingEnabled());
-    OrderByAlgorithm orderByAlgorithm = 
OrderByAlgorithm.fromQueryContext(_queryContext);
-    if (sortedColumnsPrefixSize > 0 && orderByAlgorithm != 
OrderByAlgorithm.NAIVE) {
+    if (sortedColumnsPrefixSize > 0) {
       int maxDocsPerCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
       // The first order by expressions are sorted (either asc or desc).
       // ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column 
DESC LIMIT 10 OFFSET 5
       // or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column, 
not_sorted LIMIT 10 OFFSET 5
       // but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted, 
sorted_column LIMIT 10 OFFSET 5
-      if (orderByExpressions.get(0).isAsc()) {
-        if (sortedColumnsPrefixSize == orderByExpressions.size()) {
-          maxDocsPerCall = Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
-        }
-        BaseProjectOperator<?> projectOperator =
-            new ProjectPlanNode(_segmentContext, _queryContext, expressions, 
maxDocsPerCall).run();
-        return new SelectionPartiallyOrderedByAscOperator(_indexSegment, 
_queryContext, expressions, projectOperator,
+
+      if (sortedColumnsPrefixSize == orderByExpressions.size()) {
+        maxDocsPerCall = Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+      }
+
+      BaseProjectOperator<?> projectOperator = getSortedByProject(expressions, 
maxDocsPerCall, orderByExpressions);
+      boolean asc = orderByExpressions.get(0).isAsc();
+      // Remember that we cannot use asc == projectOperator.isAscending() 
because empty operators are considered
+      // both ascending and descending
+      DocIdOrderedOperator.DocIdOrder queryOrder = 
DocIdOrderedOperator.DocIdOrder.fromAsc(asc);
+      if (projectOperator.isCompatibleWith(queryOrder)) {
+        return new SelectionPartiallyOrderedByLinearOperator(_indexSegment, 
_queryContext, expressions, projectOperator,
             sortedColumnsPrefixSize);
       } else {
-        BaseProjectOperator<?> projectOperator =
-            new ProjectPlanNode(_segmentContext, _queryContext, expressions, 
maxDocsPerCall).run();
         return new SelectionPartiallyOrderedByDescOperation(_indexSegment, 
_queryContext, expressions, projectOperator,
             sortedColumnsPrefixSize);
       }
@@ -120,6 +122,26 @@ public class SelectionPlanNode implements PlanNode {
     return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, projectOperator);
   }
 
+  private BaseProjectOperator<?> getSortedByProject(List<ExpressionContext> 
expressions, int maxDocsPerCall,
+      List<OrderByExpressionContext> orderByExpressions) {
+    BaseProjectOperator<?> projectOperator =
+        new ProjectPlanNode(_segmentContext, _queryContext, expressions, 
maxDocsPerCall).run();
+
+    boolean asc = orderByExpressions.get(0).isAsc();
+    if (!asc
+        && 
QueryOptionsUtils.isReverseOrderAllowed(_queryContext.getQueryOptions())
+        && 
!projectOperator.isCompatibleWith(DocIdOrderedOperator.DocIdOrder.DESC)) {
+      try {
+        return projectOperator.withOrder(DocIdOrderedOperator.DocIdOrder.DESC);
+      } catch (IllegalArgumentException | UnsupportedOperationException e) {
+        // This happens when the operator cannot provide the required order 
between blocks
+        // Fallback to SelectionOrderByOperator
+        return projectOperator;
+      }
+    }
+    return projectOperator;
+  }
+
   /**
    * This functions returns the number of expressions that are sorted by the 
implicit order in the index.
    *
@@ -174,17 +196,4 @@ public class SelectionPlanNode implements PlanNode {
       }
     }
   }
-
-  public enum OrderByAlgorithm {
-    NAIVE;
-
-    @Nullable
-    public static OrderByAlgorithm fromQueryContext(QueryContext queryContext) 
{
-      String orderByAlgorithm = 
QueryOptionsUtils.getOrderByAlgorithm(queryContext.getQueryOptions());
-      if (orderByAlgorithm == null) {
-        return null;
-      }
-      return OrderByAlgorithm.valueOf(orderByAlgorithm.toUpperCase());
-    }
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index ef3c0e1e540..ed70023d1d6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.intellij.lang.annotations.Language;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -114,7 +115,7 @@ public class SelectionCombineOperatorTest {
   }
 
   @Test
-  public void testSelectionLimit0() {
+  public void selectionLimit0() {
     SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM 
testTable LIMIT 0");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
@@ -130,7 +131,7 @@ public class SelectionCombineOperatorTest {
   }
 
   @Test
-  public void testSelectionOnly() {
+  public void selectionOnly() {
     SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM 
testTable");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
@@ -177,8 +178,10 @@ public class SelectionCombineOperatorTest {
   }
 
   @Test
-  public void testSelectionOrderBy() {
-    SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM 
testTable ORDER BY intColumn");
+  public void selectionOrderByAscending() {
+    SelectionResultsBlock combineResult = getCombineResult(
+        "SET allowReverseOrder=false;"
+        + "SELECT * FROM testTable ORDER BY intColumn");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
     List<Object[]> rows = combineResult.getRows();
@@ -198,11 +201,16 @@ public class SelectionCombineOperatorTest {
     int numSegmentsMatched = combineResult.getNumSegmentsMatched();
     assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
+  }
 
-    combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC");
+  @Test
+  public void selectionOrderByDescending() {
+    SelectionResultsBlock combineResult = getCombineResult(
+        "SET allowReverseOrder=false; "
+            + "SELECT * FROM testTable ORDER BY intColumn DESC");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    rows = combineResult.getRows();
+    List<Object[]> rows = combineResult.getRows();
     assertNotNull(rows);
     assertEquals(rows.size(), 10);
     int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
@@ -211,24 +219,85 @@ public class SelectionCombineOperatorTest {
     }
     // Should early-terminate after processing the result of the first 
segment. Each thread should process at most 1
     // segment.
-    numDocsScanned = combineResult.getNumDocsScanned();
+    long numDocsScanned = combineResult.getNumDocsScanned();
     assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
-        && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+        && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT,
+        "Expected number of docs scanned to be between " + 
NUM_RECORDS_PER_SEGMENT + " and "
+            + (QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 
NUM_RECORDS_PER_SEGMENT) + ", but got: "
+            + numDocsScanned);
     assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
     assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
     assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
-    numSegmentsMatched = combineResult.getNumSegmentsMatched();
+    int numSegmentsMatched = combineResult.getNumSegmentsMatched();
     assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
+  }
 
-    combineResult = getCombineResult("SELECT * FROM testTable ORDER BY 
intColumn DESC LIMIT 10000");
+  @Test
+  public void selectionOrderByDescendingWithLargeLimit() {
+    SelectionResultsBlock combineResult = getCombineResult(
+        "SET allowReverseOrder=false; "
+            + "SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    rows = combineResult.getRows();
+    List<Object[]> rows = combineResult.getRows();
     assertNotNull(rows);
     assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
     // Should not early-terminate
-    numDocsScanned = combineResult.getNumDocsScanned();
+    long numDocsScanned = combineResult.getNumDocsScanned();
+    assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
+  }
+
+  @Test
+  public void selectionOrderByDescendingWithReverseOrder() {
+    int limit = 10;
+    SelectionResultsBlock combineResult = getCombineResult(
+        "SET allowReverseOrder=true; "
+            + "SELECT * FROM testTable "
+            + "ORDER BY intColumn DESC "
+            + "LIMIT " + limit);
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    List<Object[]> rows = combineResult.getRows();
+    assertNotNull(rows);
+    assertEquals(rows.size(), 10);
+    int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
+    for (int i = 0; i < 10; i++) {
+      assertEquals((int) rows.get(i)[0], expectedValue - i);
+    }
+    // Should early-terminate after processing the result of the first 
segment. Each thread should process at most 1
+    // segment.
+    long numDocsScanned = combineResult.getNumDocsScanned();
+    assertTrue(numDocsScanned >= limit
+        && numDocsScanned <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * limit,
+        "Expected number of docs scanned to be between " + 
NUM_RECORDS_PER_SEGMENT + " and "
+            + (QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * 
NUM_RECORDS_PER_SEGMENT) + ", but got: "
+            + numDocsScanned);
+    assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+    assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
+    assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+    int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+    assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= 
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+    assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
+  }
+
+  @Test
+  public void selectionOrderByDescendingWithLargeLimitAndReverseOrder() {
+    SelectionResultsBlock combineResult = getCombineResult(
+        "SET allowReverseOrder=true; "
+            + "SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
+    assertEquals(combineResult.getDataSchema(),
+        new DataSchema(new String[]{INT_COLUMN}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+    List<Object[]> rows = combineResult.getRows();
+    assertNotNull(rows);
+    assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    // Should not early-terminate
+    long numDocsScanned = combineResult.getNumDocsScanned();
     assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
     assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
     assertEquals(combineResult.getNumEntriesScannedPostFilter(), 
numDocsScanned);
@@ -237,7 +306,7 @@ public class SelectionCombineOperatorTest {
     assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * 
NUM_RECORDS_PER_SEGMENT);
   }
 
-  private SelectionResultsBlock getCombineResult(String query) {
+  private SelectionResultsBlock getCombineResult(@Language("sql") String 
query) {
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(query);
     List<PlanNode> planNodes = new ArrayList<>(NUM_SEGMENTS);
     for (IndexSegment indexSegment : _indexSegments) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
index ad1783aebff..ccc7f578f7d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
@@ -128,7 +128,8 @@ public class DefaultAggregationExecutorTest {
     }
     int totalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     MatchAllFilterOperator matchAllFilterOperator = new 
MatchAllFilterOperator(totalDocs);
-    DocIdSetOperator docIdSetOperator = new 
DocIdSetOperator(matchAllFilterOperator, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+    DocIdSetOperator docIdSetOperator =
+        new DocIdSetOperator(matchAllFilterOperator, 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
     ProjectionOperator projectionOperator =
         new ProjectionOperator(dataSourceMap, docIdSetOperator, new 
QueryContext.Builder().build());
     TransformOperator transformOperator = new TransformOperator(_queryContext, 
projectionOperator, expressions);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 2132d395705..333b8db5de2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -28,8 +28,8 @@ import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
 import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
+import 
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByLinearOperator;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.testng.annotations.Test;
 
@@ -264,11 +264,10 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
   }
 
   @Test
-  public void testSelectionOrderBySortedColumn() {
-    // Test query order by single sorted column in ascending order
+  public void testSelectionOrderBySingleSortedColumnAsc() {
     String orderBy = " ORDER BY column5";
     BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(SELECTION_QUERY + orderBy);
-    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
     SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -285,137 +284,231 @@ public class 
InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
+  }
 
-    // Test query order by single sorted column in descending order
-    orderBy = " ORDER BY column5 DESC";
-    selectionOrderByOperator = getOperator(SELECTION_QUERY + orderBy);
-    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+  @Test
+  public void testSelectionOrderBySingleSortedColumnDescDidOrder() {
+    String orderBy = " ORDER BY column5 DESC";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = getOperator(
+        "SET allowReverseOrder = false; " + SELECTION_QUERY + orderBy);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation, "Expected: "
+        + SelectionPartiallyOrderedByLinearOperator.class + ", found: " + 
selectionOrderByOperator.getClass());
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 30000 * (3 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column1", "column11"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.STRING});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
+  }
 
-    // Test query order by all sorted columns in ascending order
+  @Test
+  public void testSelectionOrderBySingleSortedColumnDescReverseOrder() {
+    String orderBy = " ORDER BY column5 DESC";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = getOperator(
+        "SET allowReverseOrder = true; " + SELECTION_QUERY + orderBy);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator, "Expected: "
+        + SelectionPartiallyOrderedByLinearOperator.class + ", found: " + 
selectionOrderByOperator.getClass());
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 10 * (3 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column1", "column11"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.STRING});
+    List<Object[]> selectionResult = resultsBlock.getRows();
+    assertEquals(selectionResult.size(), 10);
+    Object[] lastRow = selectionResult.get(9);
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
+  }
+
+  @Test
+  public void testSelectionOrderByAllSortedColumnsAsc() {
     String query = "SELECT column5, daysSinceEpoch FROM testTable ORDER BY 
column5, daysSinceEpoch";
-    selectionOrderByOperator = getOperator(query);
-    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 10L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 10 * (2 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 126164076);
+  }
 
-    // Test query order by all sorted columns in descending order
-    query = "SELECT column5 FROM testTable ORDER BY column5 DESC, 
daysSinceEpoch DESC";
-    selectionOrderByOperator = getOperator(query);
+  @Test
+  public void testSelectionOrderByAllSortedColumnsDescDidOrder() {
+    String query = "SET allowReverseOrder = false;"
+        + "SELECT column5 "
+        + "FROM testTable "
+        + "ORDER BY column5 DESC, daysSinceEpoch DESC";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
     assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 30000 * (2 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 167572854);
+  }
 
-    // Test query order by one sorted column in ascending order, the other 
sorted column in descending order
-    query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5, 
daysSinceEpoch DESC";
-    selectionOrderByOperator = getOperator(query);
-    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+  @Test
+  public void testSelectionOrderByAllSortedColumnsDescReverseOrder() {
+    String query = "SET allowReverseOrder = true;"
+        + "SELECT column5 "
+        + "FROM testTable "
+        + "ORDER BY column5 DESC, daysSinceEpoch DESC";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    // 10 * (2 columns)
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+    List<Object[]> selectionResult = resultsBlock.getRows();
+    assertEquals(selectionResult.size(), 10);
+    Object[] lastRow = selectionResult.get(9);
+    assertEquals(lastRow.length, 2);
+    assertEquals(lastRow[0], "gFuH");
+    assertEquals(lastRow[1], 167572854);
+  }
+
+  @Test
+  public void testSelectionOrderByMixedSortedColumns() {
+    String query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5, 
daysSinceEpoch DESC";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 30000 * (2 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 167572854);
+  }
 
-    // Test query order by one sorted column in ascending order, and some 
unsorted columns
-    query = "SELECT column1 FROM testTable ORDER BY column5, column6, column1";
-    selectionOrderByOperator = getOperator(query);
-    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByAscOperator);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+  @Test
+  public void testSelectionOrderBySortedAndUnsortedColumnsAsc() {
+    String query = "SELECT column1 FROM testTable ORDER BY column5, column6, 
column1";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 30000 * (3 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column6", "column1"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
-    // Unsorted column values should be the same as ordering by their own
     assertEquals(lastRow[1], 6043515);
     assertEquals(lastRow[2], 10542595);
+  }
 
-    // Test query order by one sorted column in descending order, and some 
unsorted columns
-    query = "SELECT column6 FROM testTable ORDER BY column5 DESC, column6, 
column1";
-    selectionOrderByOperator = getOperator(query);
+  @Test
+  public void testSelectionOrderBySortedAndUnsortedColumnsDescDidOrder() {
+    String query = "SET allowReverseOrder = false; "
+        + "SELECT  column6 FROM testTable ORDER BY column5 DESC, column6, 
column1";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
     assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByDescOperation);
-    resultsBlock = selectionOrderByOperator.nextBlock();
-    executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
     assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     // 30000 * (3 columns)
     assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
     assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
-    dataSchema = resultsBlock.getDataSchema();
+    DataSchema dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column6", "column1"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT});
-    selectionResult = resultsBlock.getRows();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.get(9);
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
+    assertEquals(lastRow[1], 6043515);
     // Unsorted column values should be the same as ordering by their own
+    assertEquals(lastRow[2], 10542595);
+  }
+
+  @Test
+  public void testSelectionOrderBySortedAndUnsortedColumnsDescReverseOrder() {
+    String query = "SET allowReverseOrder = true; "
+        + "SELECT column6 FROM testTable ORDER BY column5 DESC, column6, 
column1";
+    BaseOperator<SelectionResultsBlock> selectionOrderByOperator = 
getOperator(query);
+    assertTrue(selectionOrderByOperator instanceof 
SelectionPartiallyOrderedByLinearOperator);
+    SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+    ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
+    assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+    assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+    assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+    assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+    DataSchema dataSchema = resultsBlock.getDataSchema();
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column5", 
"column6", "column1"});
+    assertEquals(dataSchema.getColumnDataTypes(),
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, 
ColumnDataType.INT});
+    List<Object[]> selectionResult = resultsBlock.getRows();
+    assertEquals(selectionResult.size(), 10);
+    Object[] lastRow = selectionResult.get(9);
+    assertEquals(lastRow.length, 3);
+    assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 6043515);
+    // Unsorted column values should be the same as ordering by their own
     assertEquals(lastRow[2], 10542595);
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
index 0ea1e8b8b95..b96d16b0c54 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
@@ -374,7 +374,8 @@ public class TextMatchTransformFunctionTest {
                 + "BROKER_REDUCE(sort:[startTime ASC],limit:1000) | 1 | 0\n"
                 + "COMBINE_SELECT_ORDERBY_MINMAX | 2 | 1\n"
                 + "PLAN_START(numSegmentsForThisPlan:1) | -1 | -1\n"
-                + "SELECT_PARTIAL_ORDER_BY_ASC(sortedList: (startTime), 
unsortedList: (), rest: (case(is_null(agent),"
+                + "SELECT_PARTIAL_ORDER_BY_LINEAR("
+                + "sortedList: (startTime), unsortedList: (), rest: 
(case(is_null(agent),"
                 + 
"'N/A',and(text_match(part,'_zz_'),is_not_null(part)),agent,''))) | 3 | 2\n"
                 + 
"TRANSFORM(case(is_null(agent),'N/A',and(text_match(part,'_zz_'),is_not_null(part)),agent,''),
 "
                 + "startTime) | 4 | 3\n"
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
index 41ca58636c6..884673617e6 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
@@ -21,14 +21,10 @@ package org.apache.pinot.perf;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.queries.BaseQueriesTest;
@@ -36,20 +32,12 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tools.SortedColumnQuickstart;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -77,46 +65,17 @@ public class BenchmarkOrderByQueries extends 
BaseQueriesTest {
 
   public static void main(String[] args)
       throws Exception {
-    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkOrderByQueries.class.getSimpleName());
+    ChainedOptionsBuilder opt = new OptionsBuilder()
+        .include(BenchmarkOrderByQueries.class.getSimpleName());
     new Runner(opt.build()).run();
   }
 
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"FilteredAggregationsTest");
-  private static final String TABLE_NAME = "MyTable";
   private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
   private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
-  private static final String INT_COL_NAME = "INT_COL";
-  private static final String SORTED_COL_NAME = "SORTED_COL";
-  private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
-  private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
-  private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
-  private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
-  private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
-  private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
-
-  private static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-      .setInvertedIndexColumns(List.of(INT_COL_NAME, 
LOW_CARDINALITY_STRING_COL))
-      .setFieldConfigList(FIELD_CONFIGS)
-      .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME))
-      .setSortedColumn(SORTED_COL_NAME)
-      .setRangeIndexColumns(List.of(INT_COL_NAME, LOW_CARDINALITY_STRING_COL))
-      .setStarTreeIndexConfigs(Collections.singletonList(
-          new StarTreeIndexConfig(Arrays.asList(SORTED_COL_NAME, 
INT_COL_NAME), null, Collections.singletonList(
-              new AggregationFunctionColumnPair(AggregationFunctionType.SUM, 
RAW_INT_COL_NAME).toColumnName()), null,
-              Integer.MAX_VALUE)))
-      .build();
-  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-      .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
-      .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
-      .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
-      .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
-      .addSingleValueDimension(RAW_STRING_COL_NAME, FieldSpec.DataType.STRING)
-      .addSingleValueDimension(NO_INDEX_STRING_COL, FieldSpec.DataType.STRING)
-      .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
-      .build();
-
-  @Param({"true", "false"})
-  private boolean _zasc; // called zasc just to force this parameter to be the 
last used in the report
+
+  @Param({"ASC", "NAIVE_DESC", "OPTIMIZED_DESC"})
+  private Mode _zMode; // called zMode just to force this parameter to be the 
last used in the report
   @Param("1500000")
   private int _numRows;
   //@Param({"EXP(0.5)"})
@@ -138,7 +97,10 @@ public class BenchmarkOrderByQueries extends 
BaseQueriesTest {
     buildSegment(FIRST_SEGMENT_NAME);
     buildSegment(SECOND_SEGMENT_NAME);
 
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(
+        SortedColumnQuickstart.SortedTable.TABLE_CONFIG,
+        SortedColumnQuickstart.SortedTable.SCHEMA
+    );
 
     ImmutableSegment firstImmutableSegment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME), 
indexLoadingConfig);
@@ -158,34 +120,16 @@ public class BenchmarkOrderByQueries extends 
BaseQueriesTest {
     EXECUTOR_SERVICE.shutdownNow();
   }
 
-  private List<GenericRow> createTestData(int numRows) {
-    Map<Integer, String> strings = new HashMap<>();
-    List<GenericRow> rows = new ArrayList<>();
-    String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
"value" + i)
-        .toArray(String[]::new);
-    for (int i = 0; i < numRows; i += _primaryRepetitions) {
-      for (int j = 0; j < _primaryRepetitions; j++) {
-        GenericRow row = new GenericRow();
-        row.putValue(SORTED_COL_NAME, i);
-        row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
-        row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
-        row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
-        row.putValue(RAW_STRING_COL_NAME, strings.computeIfAbsent(
-            (int) _supplier.getAsLong(), k -> UUID.randomUUID().toString()));
-        row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
-        row.putValue(LOW_CARDINALITY_STRING_COL, lowCardinalityValues[(i + j) 
% lowCardinalityValues.length]);
-        rows.add(row);
-      }
-    }
-    return rows;
-  }
-
   private void buildSegment(String segmentName)
       throws Exception {
-    List<GenericRow> rows = createTestData(_numRows);
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, 
SCHEMA);
+    List<GenericRow> rows = 
SortedColumnQuickstart.SortedTable.streamData(_primaryRepetitions, _supplier)
+        .limit(_numRows)
+        .collect(Collectors.toCollection(() -> new ArrayList<>(_numRows)));
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(
+        SortedColumnQuickstart.SortedTable.TABLE_CONFIG,
+        SortedColumnQuickstart.SortedTable.SCHEMA);
     config.setOutDir(INDEX_DIR.getPath());
-    config.setTableName(TABLE_NAME);
+    
config.setTableName(SortedColumnQuickstart.SortedTable.SCHEMA.getSchemaName());
     config.setSegmentName(segmentName);
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
@@ -197,34 +141,57 @@ public class BenchmarkOrderByQueries extends 
BaseQueriesTest {
 
   @Benchmark
   public BrokerResponseNative sortedTotally() {
-    if (_zasc) {
-    return getBrokerResponse(
-        "SELECT SORTED_COL "
-              + "FROM MyTable "
-            + "ORDER BY SORTED_COL ASC "
-              + "LIMIT " + _limit);
-    } else {
-    return getBrokerResponse(
-        "SELECT SORTED_COL "
-              + "FROM MyTable "
-              + "ORDER BY SORTED_COL DESC "
-              + "LIMIT " + _limit);
+    switch (_zMode) {
+      case ASC:
+        return getBrokerResponse(
+            "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL ASC "
+                + "LIMIT " + _limit);
+      case NAIVE_DESC:
+        return getBrokerResponse(
+            "SET allowReverseOrder=false;"
+                + "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL DESC "
+                + "LIMIT " + _limit);
+      case OPTIMIZED_DESC:
+        return getBrokerResponse(
+            "SET allowReverseOrder=true;"
+                + "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL DESC "
+                + "LIMIT " + _limit);
+      default:
+        throw new IllegalStateException("Unknown mode: " + _zMode);
     }
   }
+
   @Benchmark
   public BrokerResponseNative sortedPartially() {
-    if (_zasc) {
-    return getBrokerResponse(
-        "SELECT SORTED_COL "
-              + "FROM MyTable "
-              + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
-              + "LIMIT " + _limit);
-    } else {
-    return getBrokerResponse(
-        "SELECT SORTED_COL "
-              + "FROM MyTable "
-            + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
-              + "LIMIT " + _limit);
+    switch (_zMode) {
+      case ASC:
+        return getBrokerResponse(
+            "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
+                + "LIMIT " + _limit);
+      case NAIVE_DESC:
+        return getBrokerResponse(
+            "SET allowReverseOrder=false;"
+                + "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+                + "LIMIT " + _limit);
+      case OPTIMIZED_DESC:
+        return getBrokerResponse(
+            "SET allowReverseOrder=true;"
+                + "SELECT SORTED_COL "
+                + "FROM sorted "
+                + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+                + "LIMIT " + _limit);
+      default:
+        throw new IllegalStateException("Unknown mode: " + _zMode);
     }
   }
 
@@ -242,4 +209,8 @@ public class BenchmarkOrderByQueries extends 
BaseQueriesTest {
   protected List<IndexSegment> getIndexSegments() {
     return _indexSegments;
   }
+
+  public enum Mode {
+    ASC, NAIVE_DESC, OPTIMIZED_DESC
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
index 191b1b62c40..f09e06ae11a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
@@ -63,6 +63,16 @@ public final class FixedBitSVForwardIndexReaderV2 implements 
ForwardIndexReader<
 
   @Override
   public void readDictIds(int[] docIds, int length, int[] dictIdBuffer, 
ForwardIndexReaderContext context) {
+    int firstDocId = docIds[0];
+    int lastDocId = docIds[length - 1];
+    if (firstDocId <= lastDocId) {
+      readDictIdsAsc(docIds, length, dictIdBuffer, context);
+    } else {
+      readDictIdsDesc(docIds, length, dictIdBuffer, context);
+    }
+  }
+
+  private void readDictIdsAsc(int[] docIds, int length, int[] dictIdBuffer, 
ForwardIndexReaderContext context) {
     int firstDocId = docIds[0];
     int lastDocId = docIds[length - 1];
     int index = 0;
@@ -98,6 +108,13 @@ public final class FixedBitSVForwardIndexReaderV2 
implements ForwardIndexReader<
     }
   }
 
+  private void readDictIdsDesc(int[] docIds, int length, int[] dictIdBuffer, 
ForwardIndexReaderContext context) {
+    // TODO: Implement bulk read for descending order
+    for (int i = 0; i < length; i++) {
+      dictIdBuffer[i] = _reader.read(docIds[i]);
+    }
+  }
+
   @Override
   public void close() {
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7efc2c3c593..ed54afa62e5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -687,7 +687,8 @@ public class CommonConstants {
         // Query option key used to enable a given set of defaultly disabled 
rules
         public static final String USE_PLANNER_RULES = "usePlannerRules";
 
-        public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
+        public static final String ALLOW_REVERSE_ORDER = "allowReverseOrder";
+        public static final boolean DEFAULT_ALLOW_REVERSE_ORDER = false;
 
         public static final String MULTI_STAGE_LEAF_LIMIT = 
"multiStageLeafLimit";
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java
new file mode 100644
index 00000000000..b7756ace228
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+
+/// Notice: In order to run this quickstart, you need to run the SortedTable 
main method once to
+/// generate the content in the 
pinot-tools/target/classes/examples/batch/sorted folder.
+public class SortedColumnQuickstart extends Quickstart {
+  private static final String QUICKSTART_IDENTIFIER = "SORTED";
+  @Override
+  public List<String> types() {
+    return Collections.singletonList(QUICKSTART_IDENTIFIER);
+  }
+
+  @Override
+  protected String[] getDefaultBatchTableDirectories() {
+    // contrary to other quickstarts, here we create the content automatically.
+    // it is important to notice this path is not a file path but a resource 
path on the classpath.
+    // this means we can create the content in the 
pinot-tools/target/classes/examples/batch/sorted folder
+    return new String[] { "examples/batch/sorted" };
+  }
+
+  @Override
+  protected Map<String, String> getDefaultStreamTableDirectories() {
+    return Map.of();
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    List<String> arguments = new ArrayList<>();
+    arguments.addAll(Arrays.asList("QuickStart", "-type", 
QUICKSTART_IDENTIFIER));
+    arguments.addAll(Arrays.asList(args));
+    PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+  }
+
+  public static class SortedTable {
+    private static final String TABLE_NAME = "sorted";
+    private static final String INT_COL_NAME = "INT_COL";
+    private static final String SORTED_COL_NAME = "SORTED_COL";
+    private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+    private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+    private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+    private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+    private static final String LOW_CARDINALITY_STRING_COL = 
"LOW_CARDINALITY_STRING_COL";
+    private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
+
+    public static final TableConfig TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInvertedIndexColumns(List.of(INT_COL_NAME, 
LOW_CARDINALITY_STRING_COL))
+        .setFieldConfigList(FIELD_CONFIGS)
+        .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME))
+        .setSortedColumn(SORTED_COL_NAME)
+        .setRangeIndexColumns(List.of(INT_COL_NAME, 
LOW_CARDINALITY_STRING_COL))
+        .setStarTreeIndexConfigs(Collections.singletonList(
+            new StarTreeIndexConfig(Arrays.asList(SORTED_COL_NAME, 
INT_COL_NAME), null, Collections.singletonList(
+                new AggregationFunctionColumnPair(AggregationFunctionType.SUM, 
RAW_INT_COL_NAME).toColumnName()), null,
+                Integer.MAX_VALUE)))
+        .build();
+    public static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(RAW_STRING_COL_NAME, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(NO_INDEX_STRING_COL, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LOW_CARDINALITY_STRING_COL, 
FieldSpec.DataType.STRING)
+        .build();
+
+    public static void main(String[] args)
+        throws Exception {
+      Path activeDir = new File("").getAbsoluteFile().toPath();
+      Path pinotToolsPath;
+      if (activeDir.endsWith("pinot")) {
+        pinotToolsPath = activeDir.resolve("pinot-tools");
+      } else {
+        pinotToolsPath = activeDir;
+      }
+      Path bootstrapDir = pinotToolsPath.resolve(Path.of("src", "main", 
"resources", "examples", "batch", "sorted"));
+      prepareBootstrapDir(bootstrapDir);
+    }
+
+    private static void prepareBootstrapDir(Path bootstrapDir)
+        throws IOException {
+      File bootstrapDirFile = bootstrapDir.toFile();
+      bootstrapDirFile.mkdirs();
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      File tableConfigFile = 
bootstrapDir.resolve("sorted_offline_table_config.json").toFile();
+      
objectMapper.writerWithDefaultPrettyPrinter().writeValue(tableConfigFile, 
SortedTable.TABLE_CONFIG);
+      File schemaFile = bootstrapDir.resolve("sorted_schema.json").toFile();
+      objectMapper.writerWithDefaultPrettyPrinter().writeValue(schemaFile, 
SortedTable.SCHEMA);
+
+      File rawdata = bootstrapDir.resolve("rawdata").toFile();
+      rawdata.mkdirs();
+
+      try (FileWriter fw = new FileWriter(new File(rawdata, "data.csv"))) {
+        // write the CSV header
+        fw.append(String.join(",", SortedTable.SORTED_COL_NAME, 
SortedTable.INT_COL_NAME,
+            SortedTable.NO_INDEX_INT_COL_NAME, SortedTable.RAW_INT_COL_NAME, 
SortedTable.RAW_STRING_COL_NAME,
+            SortedTable.NO_INDEX_STRING_COL, 
SortedTable.LOW_CARDINALITY_STRING_COL)).append('\n');
+
+        Random r = new Random(42);
+        streamData(1000, r::nextLong)
+            .limit(100_000)
+            .forEach(s -> {
+              // generate CSV line
+              try {
+                
fw.append(String.valueOf(s.getValue(SortedTable.SORTED_COL_NAME))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.INT_COL_NAME))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.NO_INDEX_INT_COL_NAME))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.RAW_INT_COL_NAME))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.RAW_STRING_COL_NAME))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.NO_INDEX_STRING_COL))).append(',');
+                
fw.append(String.valueOf(s.getValue(SortedTable.LOW_CARDINALITY_STRING_COL))).append('\n');
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            });
+      }
+    }
+
+    public static Stream<GenericRow> streamData(int primaryRepetitions, 
LongSupplier longSupplier) {
+      Map<Integer, String> strings = new HashMap<>();
+      String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
"value" + i)
+          .toArray(String[]::new);
+      return IntStream.iterate(0, i -> i + 1)
+          .boxed()
+          .flatMap(i -> IntStream.range(0, primaryRepetitions)
+              .mapToObj(j -> {
+                GenericRow row = new GenericRow();
+                row.putValue(SortedTable.SORTED_COL_NAME, i);
+                row.putValue(SortedTable.INT_COL_NAME, (int) 
longSupplier.getAsLong());
+                row.putValue(SortedTable.NO_INDEX_INT_COL_NAME, (int) 
longSupplier.getAsLong());
+                row.putValue(SortedTable.RAW_INT_COL_NAME, (int) 
longSupplier.getAsLong());
+                row.putValue(SortedTable.RAW_STRING_COL_NAME, 
strings.computeIfAbsent(
+                    (int) longSupplier.getAsLong(), k -> 
UUID.randomUUID().toString()));
+                row.putValue(SortedTable.NO_INDEX_STRING_COL, 
row.getValue(SortedTable.RAW_STRING_COL_NAME));
+                int lowCardinalityIndex = (i + j) % 
lowCardinalityValues.length;
+                row.putValue(SortedTable.LOW_CARDINALITY_STRING_COL, 
lowCardinalityValues[lowCardinalityIndex]);
+                return row;
+              })
+          );
+    }
+  }
+}
diff --git 
a/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml 
b/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml
new file mode 100644
index 00000000000..e17fe2e93e4
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+  # name: execution framework name
+  name: 'standalone'
+
+  # Class to use for segment generation and different push types.
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+  segmentMetadataPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
+
+# jobType: Pinot ingestion job type.
+# Supported job types are defined in PinotIngestionJobType class.
+#   'SegmentCreation'
+#   'SegmentTarPush'
+#   'SegmentUriPush'
+#   'SegmentMetadataPush'
+#   'SegmentCreationAndTarPush'
+#   'SegmentCreationAndUriPush'
+#   'SegmentCreationAndMetadataPush'
+jobType: SegmentCreationAndTarPush
+
+# inputDirURI: Root directory of input data, expected to have scheme 
configured in PinotFS.
+inputDirURI: 'examples/batch/sorted/rawdata'
+
+# includeFileNamePattern: include file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.csv' will include all csv files just under the inputDirURI, not 
sub directories;
+#   'glob:**/*.csv' will include all the csv files under inputDirURI 
recursively.
+includeFileNamePattern: 'glob:**/*.csv'
+
+# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
+# Sample usage:
+#   'glob:*.csv' will exclude all csv files just under the inputDirURI, not 
sub directories;
+#   'glob:**/*.csv' will exclude all the csv files under inputDirURI 
recursively.
+# _excludeFileNamePattern: ''
+
+# outputDirURI: Root directory of output segments, expected to have scheme 
configured in PinotFS.
+outputDirURI: 'examples/batch/sorted/segments'
+
+# segmentCreationJobParallelism: Parallelism to build Pinot segments.
+segmentCreationJobParallelism: 4
+
+# overwriteOutput: Overwrite output segments if existed.
+overwriteOutput: true
+
+# pinotFSSpecs: defines all related Pinot file systems.
+pinotFSSpecs:
+
+  - # scheme: used to identify a PinotFS.
+    # E.g. local, hdfs, dbfs, etc
+    scheme: file
+
+    # className: Class name used to create the PinotFS instance.
+    # E.g.
+    #   org.apache.pinot.spi.filesystem.LocalPinotFS is used for local 
filesystem
+    #   org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data 
Lake
+    #   org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+
+# recordReaderSpec: defines all record reader
+recordReaderSpec:
+
+  # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 
'json', 'thrift' etc.
+  dataFormat: 'csv'
+
+  # className: Corresponding RecordReader class name.
+  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
+
+# tableSpec: defines table name and where to fetch corresponding table config 
and table schema.
+tableSpec:
+
+  # tableName: Table name
+  tableName: 'sorted'
+
+  # schemaURI: defines where to read the table schema, supports PinotFS or 
HTTP.
+  # E.g.
+  #   hdfs://path/to/table_schema.json
+  #   http://localhost:9000/tables/myTable/schema
+  schemaURI: 'http://localhost:9000/tables/sorted/schema'
+
+  # tableConfigURI: defines where to reade the table config.
+  # Supports using PinotFS or HTTP.
+  # E.g.
+  #   hdfs://path/to/table_config.json
+  #   http://localhost:9000/tables/myTable
+  # Note that the API to read Pinot table config directly from pinot 
controller contains a JSON wrapper.
+  # The real table config is the object under the field 'OFFLINE'.
+  tableConfigURI: 'http://localhost:9000/tables/sorted'
+
+# pinotClusterSpecs: defines the Pinot Cluster Access Point.
+pinotClusterSpecs:
+  - # controllerURI: used to fetch table/schema information and data push.
+    # E.g. http://localhost:9000
+    controllerURI: 'http://localhost:9000'
+
+# pushJobSpec: defines segment push job related configuration.
+pushJobSpec:
+
+  # pushAttempts: number of attempts for push job, default is 1, which means 
no retry.
+  pushAttempts: 2
+
+  # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+  pushRetryIntervalMillis: 1000


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to