Jackie-Jiang commented on code in PR #8979:
URL: https://github.com/apache/pinot/pull/8979#discussion_r955319247


##########
pinot-core/src/main/java/org/apache/pinot/core/common/NullableRowBasedBlockValueFetcher.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common;
+
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class NullableRowBasedBlockValueFetcher extends 
RowBasedBlockValueFetcher {

Review Comment:
   Checked all the usages of `RowBasedBlockValueFetcher`, and we should just 
change it to take the boolean `nullHandlingEnabled`. No need to make a 
sub-class for it



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java:
##########
@@ -50,67 +52,113 @@ public Operator<IntermediateResultsBlock> run() {
     List<ExpressionContext> expressions = 
SelectionOperatorUtils.extractExpressions(_queryContext, _indexSegment);
     int limit = _queryContext.getLimit();
 
-    if (limit > 0) {
-      List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
-      if (orderByExpressions == null) {
-        // Selection only
-        TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-            Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-        return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
-      } else {
-        // Selection order-by
-        if (isAllOrderByColumnsSorted(orderByExpressions)) {
-          // All order-by columns are sorted, no need to sort the records
-          TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-              Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, true);
-        } else if (orderByExpressions.size() == expressions.size()) {
-          // All output expressions are ordered
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        } else {
-          // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
-          // unnecessary data fetch
-          List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(orderByExpressions.size());
-          for (OrderByExpressionContext orderByExpression : 
orderByExpressions) {
-            expressionsToTransform.add(orderByExpression.getExpression());
-          }
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform,
-                  DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        }
-      }
-    } else {
+    if (limit == 0) {
       // Empty selection (LIMIT 0)
       TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions, 0).run();
       return new EmptySelectionOperator(_indexSegment, expressions, 
transformOperator);
     }
+    List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+
+    if (orderByExpressions == null) {
+      // Selection only
+      // ie: SELECT ... FROM Table WHERE ... LIMIT 10
+      int actualLimit = Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);

Review Comment:
   `actualLimit` is confusing here. Either inline it as the original code, or 
rename it
   ```suggestion
         int maxDocsPerCall = Math.min(limit, 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java:
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries DESC that are partially sorted over the 
sorting keys.
+ *
+ * @see LinearSelectionOrderByOperator
+ */
+public class SelectionPartiallyOrderedByDescOperation extends 
LinearSelectionOrderByOperator {
+
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_DESC";
+
+  private int _numDocsScanned = 0;
+  private long _numEntriesScannedPostFilter = 0;
+
+  public SelectionPartiallyOrderedByDescOperation(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator, int sortedExpr) {
+    super(indexSegment, queryContext, expressions, transformOperator, 
sortedExpr);
+    assert queryContext.getOrderByExpressions() != null;
+    Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
+            .filter(expr -> expr.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("The query is not 
order by identifiers"))
+            .isDesc(),
+        "%s can only be used when the first column in order by is DESC", 
EXPLAIN_NAME);
+  }
+
+  @Override
+  protected List<Object[]> fetch(Supplier<ListBuilder> listBuilderSupplier) {
+
+    // Ideally we would use a descending cursor, but we don't actually have 
them
+    // Alternatively, we could store all blocks in a list and iterate them in 
reverse order, but ProjectionBlocks share
+    // the same DataBlockCache, so they may be ephemeral and being overridden 
by the next block.
+    // The only alternative we have right now is to retrieve the last LIMIT 
elements from each block
+
+    TransformBlock block;
+    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+
+    List<List<Object[]>> rowsPerBlock = new ArrayList<>();

Review Comment:
   This will cause memory issue when the LIMIT is large (e.g. when the LIMIT is 
10000, which is the same as the block size, we will cache all rows in memory)
   
   One way to avoid it is to keep only the rows selected up to the current 
block, and then when the new block comes, if we don't gather enough rows, add 
the records from the previous blocks and update the rows.
   
   When the `LIMIT` is larger than the block size, this will always add 
overhead, so we should avoid using this operator



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java:
##########
@@ -587,4 +600,101 @@ public BlockDocIdValueSet getBlockDocIdValueSet() {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
+
+  public static class ListSelection extends IntermediateResultsBlock {

Review Comment:
   Suggest not creating subclasses for now. For the purpose of this PR, we can 
add the `_comparator` to the `IntermediateResultsBlock` and change the 
constructor to `public IntermediateResultsBlock(DataSchema dataSchema, 
Collection<Object[]> selectionResult, @Nullable Comparator comparator, boolean 
nullHandlingEnabled)`
   
   I'm planning to split the `IntermediateResultsBlock` into results block for 
each query type (similar idea with this one), but IMO it is better to do it in 
a separate PR



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java:
##########
@@ -50,67 +52,113 @@ public Operator<IntermediateResultsBlock> run() {
     List<ExpressionContext> expressions = 
SelectionOperatorUtils.extractExpressions(_queryContext, _indexSegment);
     int limit = _queryContext.getLimit();
 
-    if (limit > 0) {
-      List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
-      if (orderByExpressions == null) {
-        // Selection only
-        TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-            Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-        return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
-      } else {
-        // Selection order-by
-        if (isAllOrderByColumnsSorted(orderByExpressions)) {
-          // All order-by columns are sorted, no need to sort the records
-          TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-              Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, true);
-        } else if (orderByExpressions.size() == expressions.size()) {
-          // All output expressions are ordered
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        } else {
-          // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
-          // unnecessary data fetch
-          List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(orderByExpressions.size());
-          for (OrderByExpressionContext orderByExpression : 
orderByExpressions) {
-            expressionsToTransform.add(orderByExpression.getExpression());
-          }
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform,
-                  DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        }
-      }
-    } else {
+    if (limit == 0) {
       // Empty selection (LIMIT 0)
       TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions, 0).run();
       return new EmptySelectionOperator(_indexSegment, expressions, 
transformOperator);
     }
+    List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+
+    if (orderByExpressions == null) {
+      // Selection only
+      // ie: SELECT ... FROM Table WHERE ... LIMIT 10
+      int actualLimit = Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+      TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, actualLimit);
+      TransformOperator transformOperator = planNode.run();
+
+      return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
+    }
+    int numOrderByExpressions = orderByExpressions.size();
+    // Although it is a break of abstraction, some code, specially merging, 
assumes that if there is an order by
+    // expression the operator will return a block whose selection result is a 
priority queue.
+    int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions);

Review Comment:
   When null-handling is enabled, and the sorted column contains `null`, we 
cannot use the optimized operator because it breaks the sorted assumption. 
`null` value can be stored as any default value, but when ordering it, we want 
to put it in the end



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java:
##########
@@ -50,67 +52,113 @@ public Operator<IntermediateResultsBlock> run() {
     List<ExpressionContext> expressions = 
SelectionOperatorUtils.extractExpressions(_queryContext, _indexSegment);
     int limit = _queryContext.getLimit();
 
-    if (limit > 0) {
-      List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
-      if (orderByExpressions == null) {
-        // Selection only
-        TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-            Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-        return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
-      } else {
-        // Selection order-by
-        if (isAllOrderByColumnsSorted(orderByExpressions)) {
-          // All order-by columns are sorted, no need to sort the records
-          TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions,
-              Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL)).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, true);
-        } else if (orderByExpressions.size() == expressions.size()) {
-          // All output expressions are ordered
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        } else {
-          // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
-          // unnecessary data fetch
-          List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(orderByExpressions.size());
-          for (OrderByExpressionContext orderByExpression : 
orderByExpressions) {
-            expressionsToTransform.add(orderByExpression.getExpression());
-          }
-          TransformOperator transformOperator =
-              new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform,
-                  DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
-          return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator, false);
-        }
-      }
-    } else {
+    if (limit == 0) {
       // Empty selection (LIMIT 0)
       TransformOperator transformOperator = new 
TransformPlanNode(_indexSegment, _queryContext, expressions, 0).run();
       return new EmptySelectionOperator(_indexSegment, expressions, 
transformOperator);
     }
+    List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+
+    if (orderByExpressions == null) {
+      // Selection only
+      // ie: SELECT ... FROM Table WHERE ... LIMIT 10
+      int actualLimit = Math.min(limit, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+      TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, actualLimit);
+      TransformOperator transformOperator = planNode.run();
+
+      return new SelectionOnlyOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
+    }
+    int numOrderByExpressions = orderByExpressions.size();
+    // Although it is a break of abstraction, some code, specially merging, 
assumes that if there is an order by
+    // expression the operator will return a block whose selection result is a 
priority queue.
+    int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions);
+    if (sortedColumnsPrefixSize > 0 && isOrderByOptimizationEnabled()) {
+      // The first order by expressions are sorted (either asc or desc).
+      // ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column 
DESC LIMIT 10 OFFSET 5
+      // or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column, 
not_sorted LIMIT 10 OFFSET 5
+      // but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted, 
sorted_column LIMIT 10 OFFSET 5
+      if (orderByExpressions.get(0).isAsc()) {
+        int actualLimit = Math.min(limit + _queryContext.getOffset(), 
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+        TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, actualLimit);
+        TransformOperator transformOperator = planNode.run();
+        return new SelectionPartiallyOrderedByAscOperator(_indexSegment, 
_queryContext, expressions,
+            transformOperator, sortedColumnsPrefixSize);
+      } else {
+        int actualLimit = DocIdSetPlanNode.MAX_DOC_PER_CALL;
+        TransformPlanNode planNode = new TransformPlanNode(_indexSegment, 
_queryContext, expressions, actualLimit);
+        TransformOperator transformOperator = planNode.run();
+        return new SelectionPartiallyOrderedByDescOperation(_indexSegment, 
_queryContext, expressions,
+            transformOperator, sortedColumnsPrefixSize);
+      }
+    }
+    if (numOrderByExpressions == expressions.size()) {
+      // All output expressions are ordered
+      // ie: SELECT not_sorted1, not_sorted2 FROM Table WHERE ... ORDER BY 
not_sorted1, not_sorted2 LIMIT 10 OFFSET 5
+      TransformOperator transformOperator =
+          new TransformPlanNode(_indexSegment, _queryContext, expressions, 
DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+      return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
+    }
+    // Not all output expressions are ordered, only fetch the order-by 
expressions and docId to avoid the
+    // unnecessary data fetch
+    // ie: SELECT ... FROM Table WHERE ... ORDER BY not_sorted1, not_sorted2 
LIMIT 10
+    List<ExpressionContext> expressionsToTransform = new 
ArrayList<>(numOrderByExpressions);
+    for (OrderByExpressionContext orderByExpression : orderByExpressions) {
+      expressionsToTransform.add(orderByExpression.getExpression());
+    }
+    TransformOperator transformOperator =
+        new TransformPlanNode(_indexSegment, _queryContext, 
expressionsToTransform,
+            DocIdSetPlanNode.MAX_DOC_PER_CALL).run();
+    return new SelectionOrderByOperator(_indexSegment, _queryContext, 
expressions, transformOperator);
   }
 
   /**
-   *  This function checks whether all columns in order by clause are 
pre-sorted.
-   *  This is used to optimize order by limit clauses.
-   *  For eg:
-   *  A query like "select * from table order by col1, col2 limit 10"
-   *  will take all the n matching rows and add it to a priority queue of size 
10.
-   *  This is nlogk operation which can be quite expensive for a large n.
-   *  In the above example, if the docs in the segment are already sorted by 
col1 and col2 then there is no need for
-   *  sorting at all (only limit is needed).
-   * @return true is all columns in order by clause are sorted . False 
otherwise
+   * This functions returns the number of expressions that are sorted by the 
implicit order in the index.
+   *
+   * This means that query that uses these expressions in its order by doesn't 
actually need to sort from 0 to the
+   * given number (excluded) of expressions, as they are returned in the 
correct order.
+   *
+   * This method supports ASC and DESC order and ensures that all prefix 
expressions follow the same order. For example,
+   * ORDER BY sorted_col1 ASC, sorted_col2 ASC and ORDER BY sorted_col1 DESC, 
sorted_col2 DESC will return 2 but
+   * ORDER BY sorted_col1 DESC, sorted_col2 ASC and ORDER BY sorted_col1 ASC, 
sorted_col2 DESC will return 1 while
+   * ORDER BY not_sorted, sorted_col1 will return 0 because the first column 
is not sorted.
+   *
+   * It doesn't make sense to add literal expressions in an order by 
expression, but if they are included, they are
+   * considered sorted and its ASC/DESC is ignored.
+   *
+   * @return the max number that guarantees that from the first expression to 
the returned number, the index is already
+   * sorted.
    */
-  private boolean isAllOrderByColumnsSorted(List<OrderByExpressionContext> 
orderByExpressions) {
-    for (OrderByExpressionContext orderByExpression : orderByExpressions) {
-      if (!(orderByExpression.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER)
-          || !orderByExpression.isAsc()) {
-        return false;
+  private int getSortedColumnsPrefix(List<OrderByExpressionContext> 
orderByExpressions) {
+    boolean asc = orderByExpressions.get(0).isAsc();
+    for (int i = 0; i < orderByExpressions.size(); i++) {
+      if (!isSorted(orderByExpressions.get(i), asc)) {
+        return i;
       }
-      String column = orderByExpression.getExpression().getIdentifier();
-      if 
(!_indexSegment.getDataSource(column).getDataSourceMetadata().isSorted()) {
+    }
+    // If we reach here, all are sorted
+    return orderByExpressions.size();
+  }
+
+  private boolean isSorted(OrderByExpressionContext orderByExpression, boolean 
asc) {
+    switch (orderByExpression.getExpression().getType()) {
+      case LITERAL: {
+        return true;
+      }
+      case IDENTIFIER: {
+        if (!orderByExpression.isAsc() == asc) {
+          return false;
+        }
+        String column = orderByExpression.getExpression().getIdentifier();
+        return 
_indexSegment.getDataSource(column).getDataSourceMetadata().isSorted();
+      }
+      case FUNCTION: // we could optimize monotonically increasing functions
+      default: {
         return false;
       }
     }
-    return true;
+  }
+
+  private boolean isOrderByOptimizationEnabled() {

Review Comment:
   I don't think we need this since it is on by default. If you want to have 
the ability to turn it off, put it in the `QueryOptions`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to