Jackie-Jiang commented on code in PR #8979: URL: https://github.com/apache/pinot/pull/8979#discussion_r925008347
########## pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java: ########## @@ -80,9 +81,8 @@ public IntermediateResultsBlock() { /** * Constructor for selection result. */ - public IntermediateResultsBlock(DataSchema dataSchema, Collection<Object[]> selectionResult) { + public IntermediateResultsBlock(DataSchema dataSchema) { Review Comment: The change in this class can be simplified by adding a `@Nullable Comparator<? super Object[]> comparator`, then we may add a new method `public PriorityQueue<Object[]> getSelectionResultAsPriorityQueue()`. I feel the current change is a little bit too complicated, and we might not want to extend this block ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java: ########## @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; + + +/** + * An operator for order-by queries DESC that are partially sorted over the sorting keys. + * + * @see LinearSelectionOrderByOperator + */ +public class SelectionPartiallyOrderedByDescOperation extends LinearSelectionOrderByOperator { + + private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_DESC"; + + private int _numDocsScanned = 0; + private long _numEntriesScannedPostFilter = 0; + + public SelectionPartiallyOrderedByDescOperation(IndexSegment indexSegment, QueryContext queryContext, + List<ExpressionContext> expressions, TransformOperator transformOperator, int sortedExpr) { + super(indexSegment, queryContext, expressions, transformOperator, sortedExpr); + assert queryContext.getOrderByExpressions() != null; + Preconditions.checkArgument(queryContext.getOrderByExpressions().stream() + .filter(expr -> expr.getExpression().getType() == ExpressionContext.Type.IDENTIFIER) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("The query is not order by identifiers")) + .isDesc(), + "%s can only be used when the first column in order by is DESC", EXPLAIN_NAME); + } + + @Override + protected void fetch(ListBuilder listBuilder) { + + List<TransformBlock> blocks = new ArrayList<>(); + + TransformBlock transformBlock; + int numColumnsProjected = _transformOperator.getNumColumnsProjected(); + try { + while ((transformBlock = _transformOperator.nextBlock()) != null) { + int numDocsFetched = transformBlock.getNumDocs(); + _numDocsScanned += numDocsFetched; + blocks.add(transformBlock); Review Comment: (MAJOR) We cannot store all blocks here because the `ProjectionBlock` shares the same `DataBlockCache` underlying, where the previous value is overridden. For descending order, the best we can do currently is to iterate in reverse order from each block, and keep up to `LIMIT` rows per block. When getting the next block, if we gathered enough rows, we can throw away the rows from the previous block. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java: ########## @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.segment.spi.IndexSegment; + + +/** + * A selection Operator used when the first expressions in the order by are identifier expressions of columns that are + * already sorted (either ascendingly or descendingly), even if the tail of order by expressions are not sorted. + * + * ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column DESC LIMIT 10 OFFSET 5 + * or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column, not_sorted LIMIT 10 OFFSET 5 + * but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted, sorted_column LIMIT 10 OFFSET 5 + * + * Operators that derives from this class are going to have an almost linear cost instead of the usual NlogN when actual + * sorting must be done, where N is the number of rows in the segment. + * There is a degraded scenario when the cost is actually NlogL (where L is the limit of the query) when all the first L + * rows have the exact same value for the prefix of the sorted columns. Even in that case, L should be quite smaller + * than N, so this implementation is algorithmically better than the normal solution. + */ +public abstract class LinearSelectionOrderByOperator extends BaseOperator<IntermediateResultsBlock> { + protected final IndexSegment _indexSegment; + + // Deduped order-by expressions followed by output expressions from SelectionOperatorUtils.extractExpressions() + protected final List<ExpressionContext> _expressions; + protected final List<ExpressionContext> _alreadySorted; + protected final List<ExpressionContext> _toSort; + + protected final TransformOperator _transformOperator; + protected final List<OrderByExpressionContext> _orderByExpressions; + protected final TransformResultMetadata[] _expressionsMetadata; + protected final int _numRowsToKeep; + private final ListBuilder _listBuilder; + protected boolean _used = false; + protected Comparator<Object[]> _comparator; + + /** + * + * @param expressions order by expressions must be at the head of the list. + * @param sortedExpr How many expressions at the head of the expression list are going to be considered sorted by + * {@link #fetch(ListBuilder)} + */ + public LinearSelectionOrderByOperator(IndexSegment indexSegment, QueryContext queryContext, + List<ExpressionContext> expressions, TransformOperator transformOperator, + int sortedExpr) { + _indexSegment = indexSegment; + _expressions = expressions; + _transformOperator = transformOperator; + + _orderByExpressions = queryContext.getOrderByExpressions(); + assert _orderByExpressions != null; + int numOrderByExpressions = _orderByExpressions.size(); + + _alreadySorted = expressions.subList(0, sortedExpr); + _toSort = expressions.subList(sortedExpr, numOrderByExpressions); + + _expressionsMetadata = new TransformResultMetadata[_expressions.size()]; + for (int i = 0; i < _expressionsMetadata.length; i++) { + ExpressionContext expression = _expressions.get(i); + _expressionsMetadata[i] = _transformOperator.getResultMetadata(expression); + } + + _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit(); + + if (_toSort.isEmpty()) { + int expectedSize = Math.min(SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, _numRowsToKeep); + _listBuilder = new TotallySortedListBuilder(expectedSize); + } else { + int expectedSize = Math.min(SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, _numRowsToKeep * 2); + + Comparator<Object[]> sortedComparator = + OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, 0, sortedExpr); + Comparator<Object[]> unsortedComparator = + OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, sortedExpr, + numOrderByExpressions); + _listBuilder = new PartiallySortedListBuilder(expectedSize, sortedComparator, unsortedComparator); + } + + _comparator = OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, false); + } + + @Override + public IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs(); + return new ExecutionStatistics(getNumDocsScanned(), numEntriesScannedInFilter, getNumEntriesScannedPostFilter(), + numTotalDocs); + } + + protected RowBasedBlockValueFetcher fetchBlock(TransformBlock transformBlock, BlockValSet[] blockValSets) { + int numExpressions = _expressions.size(); + + for (int i = 0; i < numExpressions; i++) { + ExpressionContext expression = _expressions.get(i); + blockValSets[i] = transformBlock.getBlockValueSet(expression); + } + return new RowBasedBlockValueFetcher(blockValSets); + } + + protected abstract long getNumEntriesScannedPostFilter(); + + protected abstract int getNumDocsScanned(); + + /** + * Modifies the given {@link ListBuilder} adding rows in an order compatible with the constructor used until the + * implementation considers that no more rows need to be added. + */ + protected abstract void fetch(ListBuilder listBuilder); + + @Override + public List<Operator> getChildOperators() { + return Collections.singletonList(_transformOperator); + } + + protected abstract String getExplainName(); + + @Override + public String toExplainString() { + StringBuilder sb = new StringBuilder(getExplainName()); + + sb.append("(sortedList: "); + concatList(sb, _alreadySorted); + + sb.append(", unsortedList: "); + concatList(sb, _toSort); + + sb.append(", rest: "); + concatList(sb, _expressions.subList(_alreadySorted.size() + _toSort.size(), _expressions.size())); + + sb.append(')'); + return sb.toString(); + } + + private void concatList(StringBuilder sb, List<?> list) { + sb.append('('); + Iterator<?> it = list.iterator(); + if (it.hasNext()) { + sb.append(it.next()); + while (it.hasNext()) { + sb.append(", ").append(it.next()); + } + } + sb.append(')'); + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + Preconditions.checkState(!_used, "nextBlock was called more than once"); + _used = true; + fetch(_listBuilder); + + DataSchema dataSchema = createDataSchema(); + + List<Object[]> list = _listBuilder.build(); + if (list.size() > _numRowsToKeep) { + list = new ArrayList<>(list.subList(0, _numRowsToKeep)); + } + + return new IntermediateResultsBlock.ListSelection(dataSchema, list, _comparator); + } + + protected DataSchema createDataSchema() { + int numExpressions = _expressions.size(); + + // Create the data schema + String[] columnNames = new String[numExpressions]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions]; + for (int i = 0; i < columnNames.length; i++) { + columnNames[i] = _expressions.get(i).toString(); + } + for (int i = 0; i < numExpressions; i++) { + TransformResultMetadata expressionMetadata = _expressionsMetadata[i]; + columnDataTypes[i] = + DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue()); + } + return new DataSchema(columnNames, columnDataTypes); + } + + /** + * A private class used to build a sorted list by adding partially sorted data. + * + * Specifically, this class has been designed to receive successive calls to {@link #add(Object[])} follow by a single + * call to {@link #build()}. Once this method is called, the behavior of this object is undefined and therefore it + * should not be used. + * + * Rows must be inserted in ascending order accordingly to the partial order specified by a comparator. + * This comparator will define <i>partitions</i> of rows. All the rows in the same partition same are considered equal + * by that comparator. + * + * Some implementations, like {@link PartiallySortedListBuilder} accepts a second comparator that is used to sort rows + * inside each partition. + * When calling {@link #add(Object[])} with a row that doesn't belong to the current partition, a new one is started + * and the previous one is sorted. Therefore, this object maintains the invariant that at any moment there are at + * least {@link #sortedSize()} elements that are completely sorted, which means that no matter which elements are + * added after that, these elements are going to be smallest. + */ + protected interface ListBuilder { + + /** + * Adds the given row to this object. The new column must be equal or higher than the partition comparator. + * + * @param row The row to add. The values of the already sorted columns must be equal or higher than the last added + * row, if any. + * @return true if and only if the previous partition was closed. + */ + boolean add(Object[] row); + + /** + * Builds the sorted list. The returned list will be sorted, even if a previous call t o {@link #sortedSize()} + * returned a number lower than the number of elements that have been added. + * + * Once this method is called, the builder should not be used. There is no guaranteed on whether + * {@link #sortedSize()} is updated or not. + */ + List<Object[]> build(); + + /** + * How many elements are actually sorted. This number is lower or equal to the number of elements that has been + * added. + */ + int sortedSize(); + } + + /** + * This is the faster {@link ListBuilder} but also the most restrictive one. It can only be used when data is inserted + * in total order. Therefore it cannot be used to implement order-by queries where there is at least one expression + * that is not sorted. In such case {@link PartiallySortedListBuilder} should be used. + * + * This implementation is just a wrapper over an ArrayList and therefore the average costs of its methods is constant. + */ + protected static class TotallySortedListBuilder implements ListBuilder { + private final ArrayList<Object[]> _list; + + public TotallySortedListBuilder(int expectedSize) { + _list = new ArrayList<>(expectedSize); + } + + @Override + public boolean add(Object[] row) { + _list.add(row); + return true; + } + + @Override + public List<Object[]> build() { + return _list; + } + + @Override + public int sortedSize() { + return _list.size(); + } + } + + /** + * This is the more versatile {@link ListBuilder} that requires two comparators: The first defines the order on which + * {@link #add(Object[])} is called and defines the partitions. The second one is used to sort rows in the same + * paritition. This class can be used to implement order-by queries that include one or more not sorted expressions. + * In cases where all expressions are sorted, {@link TotallySortedListBuilder} should be used because its performance + * is better. + * + * In this implementation the {@link #add(Object[])} method has a log(P) cost, where P is the number of elements in + * each partition. In normal cases we can consider P a constant. In some degenerated cases like having an ordering by + * a sorted column whose values are all equal, P may tend to the number of elements in the segment, which would make + * the cost of this method log(N). + */ + private static class PartiallySortedListBuilder implements ListBuilder { + private final ArrayList<Object[]> _rows; + /** + * The comparator that defines the partitions and the one that impose in which order add has to be called. + */ + private final Comparator<Object[]> _partitionComparator; + /** + * The comparator that sorts different rows on each partition. + */ + private final Comparator<Object[]> _unsortedComparator; + @Nullable + private Object[] _lastPartitionRow; + /** + * The index where the current (and still not sorted) partition starts. + */ + private int _lastSorted; + + public PartiallySortedListBuilder(int expectedSize, Comparator<Object[]> partitionComparator, + Comparator<Object[]> unsortedComparator) { + _rows = new ArrayList<>(expectedSize); + _partitionComparator = partitionComparator; + _unsortedComparator = unsortedComparator; + } + + @Override + public boolean add(Object[] row) { + boolean result; + if (_lastPartitionRow == null) { + _lastSorted = 0; + _lastPartitionRow = row; + result = false; + } else { + int cmp = _partitionComparator.compare(row, _lastPartitionRow); + if (cmp < 0) { + throw new IllegalArgumentException("Row " + Arrays.toString(row) + " is lower than previously added row" + + Arrays.toString(_lastPartitionRow)); + } + if (cmp > 0) { Review Comment: In some corner case, we might gather too many rows with sorted columns the same value, and cause memory issue. We might want to use a priority queue and only keep the `LIMIT` rows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org