gortiz commented on code in PR #8979:
URL: https://github.com/apache/pinot/pull/8979#discussion_r973875208


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByOperator.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries that are partially sorted over the sorting 
keys.
+ */
+public class SelectionPartiallyOrderedByOperator extends 
BaseOperator<IntermediateResultsBlock> {
+
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDERBY";
+
+  private final IndexSegment _indexSegment;
+
+  // Deduped order-by expressions followed by output expressions from 
SelectionOperatorUtils.extractExpressions()
+  private final List<ExpressionContext> _expressions;
+  private final List<ExpressionContext> _alreadySorted;
+  private final List<ExpressionContext> _toSort;
+
+  private final TransformOperator _transformOperator;
+  private final List<OrderByExpressionContext> _orderByExpressions;
+  private final TransformResultMetadata[] _expressionsMetadata;
+  private final int _numRowsToKeep;
+  private final PartiallySortedListBuilder _sorter;
+
+  private int _numDocsScanned = 0;
+  private long _numEntriesScannedPostFilter = 0;
+  private boolean _used = false;
+  private Comparator<Object[]> _comparator;
+
+  public SelectionPartiallyOrderedByOperator(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator, int sortedExpr) {
+    Preconditions.checkArgument(sortedExpr > 0,
+        "This operator should not be used when sorting expressions doesn't 
start with sorted expressions");
+    _indexSegment = indexSegment;
+    _expressions = expressions;
+    _transformOperator = transformOperator;
+
+    _orderByExpressions = queryContext.getOrderByExpressions();
+    assert _orderByExpressions != null;
+    int numOrderByExpressions = _orderByExpressions.size();
+
+    _alreadySorted = expressions.subList(0, sortedExpr);
+    _toSort = expressions.subList(sortedExpr, numOrderByExpressions);
+
+    Preconditions.checkArgument(!_toSort.isEmpty(),
+        "This operator should not be used when all sorting expressions are 
sorted. "
+            + "Use %s instead", SelectionOrderByOperator.class.getName());
+
+    _expressionsMetadata = new TransformResultMetadata[_expressions.size()];
+    for (int i = 0; i < _expressionsMetadata.length; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      _expressionsMetadata[i] = 
_transformOperator.getResultMetadata(expression);
+    }
+
+    _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+
+    int expectedSize = Math.min(_numRowsToKeep * 2, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+    Comparator<Object[]> sortedComparator =
+        SelectionOrderByOperator.getComparator(_orderByExpressions, 
_expressionsMetadata, true, 0, sortedExpr);
+    Comparator<Object[]> unsortedComparator =
+        SelectionOrderByOperator.getComparator(_orderByExpressions, 
_expressionsMetadata, true, sortedExpr,
+            numOrderByExpressions);
+    _sorter = new PartiallySortedListBuilder(expectedSize, sortedComparator, 
unsortedComparator);
+    _comparator = SelectionOrderByOperator.getComparator(_orderByExpressions, 
_expressionsMetadata, false);
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.singletonList(_transformOperator);
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    if (!_used) {
+      _used = true;
+      fetch();
+
+      DataSchema dataSchema = createDataSchema();
+
+      return new IntermediateResultsBlock.ListSelection(dataSchema, 
_sorter.build(), _comparator);
+    }
+    return null;
+  }
+
+  private DataSchema createDataSchema() {
+
+    int numExpressions = _expressions.size();
+
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < columnNames.length; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+    }
+    for (int i = 0; i < numExpressions; i++) {
+      TransformResultMetadata expressionMetadata = _expressionsMetadata[i];
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  /**
+   * Fetches the rows that are needed into {@link #_sorter} and updated the 
metrics.
+   *
+   * The sorter can still be used.
+   */
+  private void fetch() {
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+
+    TransformBlock transformBlock;
+    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
+    try {
+      while ((transformBlock = _transformOperator.nextBlock()) != null) {
+        for (int i = 0; i < numExpressions; i++) {
+          ExpressionContext expression = _expressions.get(i);
+          blockValSets[i] = transformBlock.getBlockValueSet(expression);
+        }
+        RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(blockValSets);
+        int numDocsFetched = transformBlock.getNumDocs();
+        _numDocsScanned += numDocsFetched;
+        for (int i = 0; i < numDocsFetched; i++) {
+          boolean newBlock = _sorter.add(blockValueFetcher.getRow(i));
+          if (newBlock && _sorter.sortedSize() >= _numRowsToKeep) {
+            // We changed to a new section to ordered sorted values and we 
have more values than required.
+            // Therefore, we can stop the execution here.
+            return;
+          }
+        }
+      }
+    } finally {
+      _numEntriesScannedPostFilter = (long) _numDocsScanned * 
numColumnsProjected;
+    }
+  }
+
+  @Override
+  public String toExplainString() {
+    StringBuilder sb = new StringBuilder(EXPLAIN_NAME);
+
+    sb.append("(sortedList: ");
+    concatList(sb, _alreadySorted);
+
+    sb.append(", unsortedList: ");
+    concatList(sb, _toSort);
+
+    sb.append(", rest: ");
+    concatList(sb, _expressions.subList(_alreadySorted.size() + 
_toSort.size(), _expressions.size()));
+
+    sb.append(')');
+    return sb.toString();
+  }
+
+  private void concatList(StringBuilder sb, List<?> list) {
+    sb.append('(');
+    Iterator<?> it = list.iterator();
+    if (it.hasNext()) {
+      sb.append(it.next());
+      while (it.hasNext()) {
+        sb.append(", ").append(it.next());
+      }
+    }
+    sb.append(')');
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
_numEntriesScannedPostFilter,
+        numTotalDocs);
+  }
+
+  /**
+   * A private class used to build a sorted list by adding partially sorted 
data.
+   *
+   * Specifically, this class has been designed to receive successive calls to 
{@link #add(Object[])} follow by a single
+   * call to {@link #build()}. Once this method is called, the behavior of 
this object is undefined and therefore it
+   * should not be used.
+   *
+   * Rows must be inserted in ascending order accordingly to the partial order 
specified by {@link #_sortedComparator}.
+   * This comparator will define <i>partitions</i> of rows. All the rows in 
the same partition same are considered equal
+   * by that comparator.
+   *
+   * There is a second comparator called {@link #_unsortedComparator} that is 
used to sort rows inside each partition.
+   * When calling {@link #add(Object[])} with a row that doesn't belong to the 
current partition, a new one is started
+   * and the previous one is sorted. Therefore, this object maintains the 
invariant that at any moment there are at
+   * least {@link #sortedSize()} elements that are completely sorted, which 
means that no matter which elements are
+   * added after that, these elements are going to be smallest.
+   */
+  private static class PartiallySortedListBuilder {
+    private final ArrayList<Object[]> _rows;
+    private final Comparator<Object[]> _sortedComparator;
+    private final Comparator<Object[]> _unsortedComparator;
+
+    public PartiallySortedListBuilder(int expectedSize, Comparator<Object[]> 
sortedComparator,
+        Comparator<Object[]> unsortedComparator) {
+      _rows = new ArrayList<>(expectedSize);
+      _sortedComparator = sortedComparator;
+      _unsortedComparator = unsortedComparator;
+    }
+
+    @Nullable
+    private Object[] _lastBlockRow;
+    private int _lastSorted;
+
+    /**
+     * Adds the given row to this object. The new column must be equal or 
higher than the last added row in relation to
+     * {@link #_sortedComparator}.
+     *
+     * @param row The row to add. The values of the already sorted columns 
must be equal or higher than the last added
+     *           row, if any.
+     * @return true if and only if the previous block was closed.
+     */
+    boolean add(Object[] row) {
+      boolean result;
+      if (_lastBlockRow == null) {
+        _lastSorted = 0;
+        _lastBlockRow = row;
+        result = false;
+      } else {
+        int cmp = _sortedComparator.compare(_lastBlockRow, row);
+        assert cmp >= 0 : "Row " + Arrays.toString(row) + " is lower than 
previously added row"

Review Comment:
   This assertion was changed with an explicit `if` given that in case of 
failure, the `AssetionError` is not caught by Pinot job, so the error is lost.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to