This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6bd8a7dcff Enhance select order-by combine to use merge sort (#10357) 6bd8a7dcff is described below commit 6bd8a7dcfff67b6fb80f96a60e502d7a6df65336 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Mar 9 15:17:51 2023 -0800 Enhance select order-by combine to use merge sort (#10357) --- .../blocks/results/AggregationResultsBlock.java | 3 +- .../operator/blocks/results/BaseResultsBlock.java | 3 +- .../blocks/results/DistinctResultsBlock.java | 2 +- .../blocks/results/ExceptionResultsBlock.java | 4 +- .../blocks/results/ExplainResultsBlock.java | 3 +- .../blocks/results/GroupByResultsBlock.java | 2 +- .../blocks/results/MetadataResultsBlock.java | 4 +- .../blocks/results/SelectionResultsBlock.java | 47 ++++----- .../combine/BaseSingleBlockCombineOperator.java | 2 +- ...xValueBasedSelectionOrderByCombineOperator.java | 21 +--- .../combine/merger/ResultsBlockMerger.java | 10 -- .../merger/SelectionOnlyResultsBlockMerger.java | 7 +- .../merger/SelectionOrderByResultsBlockMerger.java | 15 +-- .../query/LinearSelectionOrderByOperator.java | 111 ++++++++++----------- .../operator/query/SelectionOrderByOperator.java | 22 ++-- .../query/selection/SelectionOperatorService.java | 7 +- .../query/selection/SelectionOperatorUtils.java | 69 +++++++++---- .../core/query/utils/OrderByComparatorFactory.java | 14 +-- .../combine/SelectionCombineOperatorTest.java | 30 +++--- .../query/LinearSelectionOrderByOperatorTest.java | 26 ++--- .../selection/SelectionOperatorServiceTest.java | 40 ++++---- ...InnerSegmentSelectionMultiValueQueriesTest.java | 11 +- ...erSegmentSelectionMultiValueRawQueriesTest.java | 9 +- ...nnerSegmentSelectionSingleValueQueriesTest.java | 63 ++++++------ 24 files changed, 245 insertions(+), 280 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java index 54fa0b9558..b2bb5b7308 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java @@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.math.BigDecimal; -import java.util.Collection; import java.util.Collections; import java.util.List; import org.apache.pinot.common.datatable.DataTable; @@ -77,7 +76,7 @@ public class AggregationResultsBlock extends BaseResultsBlock { } @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { return Collections.singletonList(_results.toArray()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java index 9d119afce7..b611430d50 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java @@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -171,7 +170,7 @@ public abstract class BaseResultsBlock implements Block { * Returns the rows for the results. Return {@code null} when the block only contains metadata. */ @Nullable - public abstract Collection<Object[]> getRows(QueryContext queryContext); + public abstract List<Object[]> getRows(QueryContext queryContext); /** * Returns a data table without metadata or exception attached. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java index 99a0f868c6..0cbeebd176 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java @@ -62,7 +62,7 @@ public class DistinctResultsBlock extends BaseResultsBlock { } @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { List<Object[]> rows = new ArrayList<>(_distinctTable.size()); for (Record record : _distinctTable.getRecords()) { rows.add(record.getValues()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java index 783c7c22a4..02abb39b84 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.operator.blocks.results; -import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.exception.QueryException; @@ -51,7 +51,7 @@ public class ExceptionResultsBlock extends BaseResultsBlock { @Nullable @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { return null; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java index bb3e71958b..5789c9ba76 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java @@ -20,7 +20,6 @@ package org.apache.pinot.core.operator.blocks.results; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,7 +51,7 @@ public class ExplainResultsBlock extends BaseResultsBlock { } @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { List<Object[]> rows = new ArrayList<>(_entries.size()); for (ExplainEntry entry : _entries) { rows.add(entry.toRow()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index f431f4bd6f..7e5057b4d4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -149,7 +149,7 @@ public class GroupByResultsBlock extends BaseResultsBlock { @Nullable @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { if (_table == null) { return Collections.emptyList(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java index 3ef747d2a1..97b941f3d6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.operator.blocks.results; -import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; @@ -41,7 +41,7 @@ public class MetadataResultsBlock extends BaseResultsBlock { @Nullable @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { return null; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java index 363cd1a4f5..b19c9898c6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java @@ -18,12 +18,9 @@ */ package org.apache.pinot.core.operator.blocks.results; -import com.google.common.base.Preconditions; import java.io.IOException; -import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.PriorityQueue; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; @@ -36,32 +33,37 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils; */ public class SelectionResultsBlock extends BaseResultsBlock { private final DataSchema _dataSchema; - private final Collection<Object[]> _rows; private final Comparator<? super Object[]> _comparator; + private List<Object[]> _rows; - public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) { - this(dataSchema, rows, null); - } - - public SelectionResultsBlock(DataSchema dataSchema, PriorityQueue<Object[]> rows) { - this(dataSchema, rows, rows.comparator()); - } - - public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]> rows, + public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows, @Nullable Comparator<? super Object[]> comparator) { _dataSchema = dataSchema; _rows = rows; _comparator = comparator; } + public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) { + this(dataSchema, rows, null); + } + public DataSchema getDataSchema() { return _dataSchema; } - public Collection<Object[]> getRows() { + public List<Object[]> getRows() { return _rows; } + public void setRows(List<Object[]> rows) { + _rows = rows; + } + + @Nullable + public Comparator<? super Object[]> getComparator() { + return _comparator; + } + @Override public int getNumRows() { return _rows.size(); @@ -73,25 +75,10 @@ public class SelectionResultsBlock extends BaseResultsBlock { } @Override - public Collection<Object[]> getRows(QueryContext queryContext) { + public List<Object[]> getRows(QueryContext queryContext) { return _rows; } - public SelectionResultsBlock convertToPriorityQueueBased() { - if (_rows instanceof PriorityQueue) { - return this; - } - Preconditions.checkState(_comparator != null, "No comparator specified in the results block"); - PriorityQueue<Object[]> result = new PriorityQueue<>(_comparator); - result.addAll(_rows); - return new SelectionResultsBlock(_dataSchema, result); - } - - public PriorityQueue<Object[]> getRowsAsPriorityQueue() { - Preconditions.checkState(_rows instanceof PriorityQueue, "The results block is not backed by a priority queue"); - return (PriorityQueue<Object[]>) _rows; - } - @Override public DataTable getDataTable(QueryContext queryContext) throws IOException { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java index adfeb710fc..7366a61b69 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java @@ -148,7 +148,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock> return blockToMerge; } if (mergedBlock == null) { - mergedBlock = _resultsBlockMerger.convertToMergeableBlock((T) blockToMerge); + mergedBlock = (T) blockToMerge; } else { _resultsBlockMerger.mergeResultsBlocks(mergedBlock, (T) blockToMerge); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index f33be5538e..666332d5ac 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -285,17 +285,16 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator return blockToMerge; } if (mergedBlock == null) { - mergedBlock = convertToMergeableBlock((SelectionResultsBlock) blockToMerge); + mergedBlock = (SelectionResultsBlock) blockToMerge; } else { mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge); } numBlocksMerged++; // Update the boundary value if enough rows are collected - PriorityQueue<Object[]> selectionResult = mergedBlock.getRowsAsPriorityQueue(); - if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { - assert selectionResult.peek() != null; - _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]); + List<Object[]> rows = mergedBlock.getRows(); + if (rows.size() == _numRowsToKeep) { + _globalBoundaryValue.set((Comparable) rows.get(_numRowsToKeep - 1)[0]); } } return mergedBlock; @@ -315,17 +314,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); return; } - - PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue(); - Collection<Object[]> rowsToMerge = blockToMerge.getRows(); - assert mergedRows != null && rowsToMerge != null; - SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep); - } - - protected SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) { - // This may create a copy or return the same instance. Anyway, this operator is the owner of the - // value now, so it can mutate it. - return resultsBlock.convertToPriorityQueueBased(); + SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep); } private static class MinMaxValueContext { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java index 0c77e66c93..318dfd2bd0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java @@ -44,14 +44,4 @@ public interface ResultsBlockMerger<T extends BaseResultsBlock> { default boolean isQuerySatisfied(T resultsBlock) { return false; } - - /** - * Converts the given results block into a mergeable results block. - * - * <p>This conversion is necessary if a block is used as the first argument for: - * {@link ResultsBlockMerger#mergeResultsBlocks(BaseResultsBlock, BaseResultsBlock)}. - */ - default T convertToMergeableBlock(T resultsBlock) { - return resultsBlock; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java index 54b6185be1..aec95823c8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.operator.combine.merger; -import java.util.Collection; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; @@ -56,10 +55,6 @@ public class SelectionOnlyResultsBlockMerger implements ResultsBlockMerger<Selec QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); return; } - - Collection<Object[]> mergedRows = mergedBlock.getRows(); - Collection<Object[]> rowsToMerge = blockToMerge.getRows(); - assert mergedRows != null && rowsToMerge != null; - SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep); + SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, _numRowsToKeep); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java index 569b2b8e3b..a5483853cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.core.operator.combine.merger; -import java.util.Collection; -import java.util.PriorityQueue; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; @@ -52,17 +50,6 @@ public class SelectionOrderByResultsBlockMerger implements ResultsBlockMerger<Se QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); return; } - - PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue(); - Collection<Object[]> rowsToMerge = blockToMerge.getRows(); - assert mergedRows != null && rowsToMerge != null; - SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep); - } - - @Override - public SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) { - // This may create a copy or return the same instance. Anyway, this operator is the owner of the - // value now, so it can mutate it. - return resultsBlock.convertToPriorityQueueBased(); + SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java index e9fe801262..c86e5a114d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java @@ -74,13 +74,8 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select protected final List<OrderByExpressionContext> _orderByExpressions; protected final TransformResultMetadata[] _expressionsMetadata; protected final int _numRowsToKeep; - private final Supplier<ListBuilder> _listBuilderSupplier; - protected boolean _used = false; - /** - * The comparator used to build the resulting {@link SelectionResultsBlock}, which sorts rows in reverse order to the - * one specified in the query. - */ - protected Comparator<Object[]> _comparator; + protected final Comparator<Object[]> _comparator; + protected final Supplier<ListBuilder> _listBuilderSupplier; /** * @param expressions Order-by expressions must be at the head of the list. @@ -107,21 +102,20 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select } _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit(); + _comparator = + OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled); if (_toSort.isEmpty()) { _listBuilderSupplier = () -> new TotallySortedListBuilder(_numRowsToKeep); } else { Comparator<Object[]> sortedComparator = - OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, false, _nullHandlingEnabled, - 0, numSortedExpressions); + OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled, 0, + numSortedExpressions); Comparator<Object[]> unsortedComparator = - OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled, + OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled, numSortedExpressions, numOrderByExpressions); _listBuilderSupplier = () -> new PartiallySortedListBuilder(_numRowsToKeep, sortedComparator, unsortedComparator); } - - _comparator = - OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled); } @Override @@ -221,17 +215,7 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select @Override protected SelectionResultsBlock getNextBlock() { - Preconditions.checkState(!_used, "nextBlock was called more than once"); - _used = true; - List<Object[]> list = fetch(_listBuilderSupplier); - - DataSchema dataSchema = createDataSchema(); - - if (list.size() > _numRowsToKeep) { - list = new ArrayList<>(list.subList(0, _numRowsToKeep)); - } - - return new SelectionResultsBlock(dataSchema, list, _comparator); + return new SelectionResultsBlock(createDataSchema(), fetch(_listBuilderSupplier), _comparator); } protected DataSchema createDataSchema() { @@ -335,25 +319,28 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select */ @VisibleForTesting static class PartiallySortedListBuilder implements ListBuilder { - /** - * A list with all the elements that have been already sorted. - */ - private final ArrayList<Object[]> _sorted; - /** - * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows. - */ - private PriorityQueue<Object[]> _lastPartitionQueue; + + private final int _maxNumRows; + /** * The comparator that defines the partitions and the one that impose in which order add has to be called. */ private final Comparator<Object[]> _partitionComparator; + /** - * The comparator that sorts different rows on each partition, which sorts rows in reverse order to the one - * specified in the query. + * The comparator that sorts different rows on each partition. */ private final Comparator<Object[]> _unsortedComparator; - private final int _maxNumRows; + /** + * List of rows, where the first _numSortedRows are sorted. + */ + private final ArrayList<Object[]> _rows; + + /** + * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows. + */ + private PriorityQueue<Object[]> _lastPartitionQueue; private Object[] _lastPartitionRow; private int _numSortedRows; @@ -361,50 +348,52 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select public PartiallySortedListBuilder(int maxNumRows, Comparator<Object[]> partitionComparator, Comparator<Object[]> unsortedComparator) { _maxNumRows = maxNumRows; - _sorted = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY)); _partitionComparator = partitionComparator; _unsortedComparator = unsortedComparator; + _rows = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY)); } @Override public boolean add(Object[] row) { if (_lastPartitionRow == null) { _lastPartitionRow = row; - _sorted.add(row); + _rows.add(row); return false; } - int cmp = _partitionComparator.compare(row, _lastPartitionRow); - if (cmp < 0) { - throw new IllegalArgumentException( - "Row with docId " + _sorted.size() + " is not sorted compared to the previous one"); - } + int compareResult = _partitionComparator.compare(row, _lastPartitionRow); + Preconditions.checkState(compareResult >= 0, "Rows are not sorted"); - boolean newPartition = cmp > 0; - if (_sorted.size() < _maxNumRows) { + boolean newPartition = compareResult > 0; + int numRows = _rows.size(); + if (numRows < _maxNumRows) { // we don't have enough rows yet if (newPartition) { _lastPartitionRow = row; - _numSortedRows = _sorted.size(); + if (numRows - _numSortedRows > 1) { + _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator); + } + _numSortedRows = numRows; } // just add the new row to the result list - _sorted.add(row); + _rows.add(row); return false; } // enough rows have been collected - assert _sorted.size() == _maxNumRows; - if (newPartition) { // and the new element belongs to a new partition, so we can just ignore it + assert numRows == _maxNumRows; + if (newPartition) { + // new element belongs to a new partition, so we can just ignore it return true; } // new element doesn't belong to a new partition, so we may need to add it - if (_lastPartitionQueue == null) { // we have exactly _numRows rows, and the new belongs to the last partition + if (_lastPartitionQueue == null) { // we need to prepare the priority queue - int numRowsInPriorityQueue = _maxNumRows - _numSortedRows; - _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator); - _lastPartitionQueue.addAll(_sorted.subList(_numSortedRows, _maxNumRows)); + int numRowsInPriorityQueue = numRows - _numSortedRows; + _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator.reversed()); + _lastPartitionQueue.addAll(_rows.subList(_numSortedRows, numRows)); } // add the new element if it is lower than the greatest element stored in the partition - if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) > 0) { + if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) < 0) { _lastPartitionQueue.poll(); _lastPartitionQueue.offer(row); } @@ -413,14 +402,18 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select @Override public List<Object[]> build() { - if (_lastPartitionQueue != null) { - assert _lastPartitionQueue.size() == _maxNumRows - _numSortedRows; - Iterator<Object[]> lastPartitionIt = _lastPartitionQueue.iterator(); - for (int i = _numSortedRows; i < _maxNumRows; i++) { - _sorted.set(i, lastPartitionIt.next()); + int numRows = _rows.size(); + if (_lastPartitionQueue == null) { + if (numRows - _numSortedRows > 1) { + _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator); + } + } else { + assert numRows == _maxNumRows && _lastPartitionQueue.size() == numRows - _numSortedRows; + for (int i = numRows - 1; i >= _numSortedRows; i--) { + _rows.set(i, _lastPartitionQueue.poll()); } } - return _sorted; + return _rows; } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index 107cbe0d7b..9c6702a481 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.query; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -80,6 +81,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock private final List<OrderByExpressionContext> _orderByExpressions; private final TransformResultMetadata[] _orderByExpressionMetadata; private final int _numRowsToKeep; + private final Comparator<Object[]> _comparator; private final PriorityQueue<Object[]> _rows; private int _numDocsScanned = 0; @@ -103,11 +105,10 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock } _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit(); - Comparator<Object[]> comparator = - OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, true, - _nullHandlingEnabled); + _comparator = + OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, _nullHandlingEnabled); _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - comparator); + _comparator.reversed()); } @Override @@ -183,7 +184,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock } DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - return new SelectionResultsBlock(dataSchema, _rows); + return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator); } /** @@ -303,7 +304,16 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock } DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - return new SelectionResultsBlock(dataSchema, _rows); + return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator); + } + + private List<Object[]> getSortedRows() { + int numRows = _rows.size(); + Object[][] sortedRows = new Object[numRows][]; + for (int i = numRows - 1; i >= 0; i--) { + sortedRows[i] = _rows.poll(); + } + return Arrays.asList(sortedRows); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java index ff1ab76533..419a4df883 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java @@ -55,7 +55,6 @@ import org.roaringbitmap.RoaringBitmap; * </li> * </ul> */ -@SuppressWarnings("rawtypes") public class SelectionOperatorService { private final QueryContext _queryContext; private final List<String> _selectionColumns; @@ -65,7 +64,7 @@ public class SelectionOperatorService { private final PriorityQueue<Object[]> _rows; /** - * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Inter segment) + * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Broker side) * * @param queryContext Selection order-by query * @param dataSchema data schema. @@ -78,6 +77,8 @@ public class SelectionOperatorService { _offset = queryContext.getOffset(); _numRowsToKeep = _offset + queryContext.getLimit(); assert queryContext.getOrderByExpressions() != null; + // TODO: Do not use type compatible comparator for performance since we don't support different data schema on + // server side combine _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema, _queryContext.isNullHandlingEnabled())); @@ -95,6 +96,8 @@ public class SelectionOperatorService { /** * Reduces a collection of {@link DataTable}s to selection rows for selection queries with <code>ORDER BY</code>. * (Broker side) + * TODO: Do merge sort after releasing 0.13.0 when server side results are sorted + * Can also consider adding a data table metadata to indicate whether the server side results are sorted */ public void reduceWithOrdering(Collection<DataTable> dataTables, boolean nullHandlingEnabled) { for (DataTable dataTable : dataTables) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java index 8ef2d405c3..dcb3cec6e6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -40,6 +39,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.trace.Tracing; @@ -192,37 +192,70 @@ public class SelectionOperatorUtils { /** * Merge two partial results for selection queries without <code>ORDER BY</code>. (Server side) * - * @param mergedRows partial results 1. - * @param rowsToMerge partial results 2. + * @param mergedBlock partial results 1. + * @param blockToMerge partial results 2. * @param selectionSize size of the selection. */ - public static void mergeWithoutOrdering(Collection<Object[]> mergedRows, Collection<Object[]> rowsToMerge, + public static void mergeWithoutOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge, int selectionSize) { - Iterator<Object[]> iterator = rowsToMerge.iterator(); - int numMergedRows = 0; - while (mergedRows.size() < selectionSize && iterator.hasNext()) { - mergedRows.add(iterator.next()); - Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows); - numMergedRows++; + List<Object[]> mergedRows = mergedBlock.getRows(); + List<Object[]> rowsToMerge = blockToMerge.getRows(); + int numRowsToMerge = Math.min(selectionSize - mergedRows.size(), rowsToMerge.size()); + if (numRowsToMerge > 0) { + mergedRows.addAll(rowsToMerge.subList(0, numRowsToMerge)); } } /** * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side) - * TODO: Should use type compatible comparator to compare the rows * - * @param mergedRows partial results 1. - * @param rowsToMerge partial results 2. + * @param mergedBlock partial results 1 (sorted). + * @param blockToMerge partial results 2 (sorted). * @param maxNumRows maximum number of rows need to be stored. */ - public static void mergeWithOrdering(PriorityQueue<Object[]> mergedRows, Collection<Object[]> rowsToMerge, + public static void mergeWithOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge, int maxNumRows) { + List<Object[]> sortedRows1 = mergedBlock.getRows(); + List<Object[]> sortedRows2 = blockToMerge.getRows(); + Comparator<? super Object[]> comparator = mergedBlock.getComparator(); + assert comparator != null; + int numSortedRows1 = sortedRows1.size(); + int numSortedRows2 = sortedRows2.size(); + if (numSortedRows1 == 0) { + mergedBlock.setRows(sortedRows2); + return; + } + if (numSortedRows2 == 0 || (numSortedRows1 == maxNumRows + && comparator.compare(sortedRows1.get(numSortedRows1 - 1), sortedRows2.get(0)) <= 0)) { + return; + } + int numRowsToMerge = Math.min(numSortedRows1 + numSortedRows2, maxNumRows); + List<Object[]> mergedRows = new ArrayList<>(numRowsToMerge); + int i1 = 0; + int i2 = 0; int numMergedRows = 0; - for (Object[] row : rowsToMerge) { - addToPriorityQueue(row, mergedRows, maxNumRows); - Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows); - numMergedRows++; + while (i1 < numSortedRows1 && i2 < numSortedRows2 && numMergedRows < numRowsToMerge) { + Object[] row1 = sortedRows1.get(i1); + Object[] row2 = sortedRows2.get(i2); + if (comparator.compare(row1, row2) <= 0) { + mergedRows.add(row1); + i1++; + } else { + mergedRows.add(row2); + i2++; + } + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows++); + } + if (numMergedRows < numRowsToMerge) { + if (i1 < numSortedRows1) { + assert i2 == numSortedRows2; + mergedRows.addAll(sortedRows1.subList(i1, i1 + numRowsToMerge - numMergedRows)); + } else { + assert i1 == numSortedRows1; + mergedRows.addAll(sortedRows2.subList(i2, i2 + numRowsToMerge - numMergedRows)); + } } + mergedBlock.setRows(mergedRows); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java index 4258317e76..5d25cc72a2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java @@ -38,17 +38,13 @@ public class OrderByComparatorFactory { } public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions, - TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled) { - return getComparator(orderByExpressions, orderByExpressionMetadata, reverse, nullHandlingEnabled, 0, + TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled) { + return getComparator(orderByExpressions, orderByExpressionMetadata, nullHandlingEnabled, 0, orderByExpressions.size()); } - /** - * @param reverse if false, the comparator will order in the direction indicated by the - * {@link OrderByExpressionContext#isAsc()}. Otherwise, it will be in the opposite direction. - */ public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions, - TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled, int from, + TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled, int from, int to) { Preconditions.checkArgument(to <= orderByExpressions.size(), "Trying to access %sth position of orderByExpressions with size %s", to, orderByExpressions.size()); @@ -76,13 +72,11 @@ public class OrderByComparatorFactory { FieldSpec.DataType[] storedTypes = new FieldSpec.DataType[numValuesToCompare]; // Use multiplier -1 or 1 to control ascending/descending order int[] multipliers = new int[numValuesToCompare]; - int ascMult = reverse ? -1 : 1; - int descMult = reverse ? 1 : -1; for (int i = 0; i < numValuesToCompare; i++) { int valueIndex = valueIndexList.get(i); valueIndices[i] = valueIndex; storedTypes[i] = orderByExpressionMetadata[valueIndex].getDataType().getStoredType(); - multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? ascMult : descMult; + multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? 1 : -1; } if (nullHandlingEnabled) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java index 88b81d5b88..f7a73647fa 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; @@ -224,12 +223,11 @@ public class SelectionCombineOperatorTest { SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn"); assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); - PriorityQueue<Object[]> selectionResult = combineResult.getRowsAsPriorityQueue(); - assertNotNull(selectionResult); - assertEquals(selectionResult.size(), 10); - int expectedValue = 9; - while (!selectionResult.isEmpty()) { - assertEquals((int) selectionResult.poll()[0], expectedValue--); + List<Object[]> rows = combineResult.getRows(); + assertNotNull(rows); + assertEquals(rows.size(), 10); + for (int i = 0; i < 10; i++) { + assertEquals((int) rows.get(i)[0], i); } // Should early-terminate after processing the result of the first segment. Each thread should process at most 1 // segment. @@ -248,12 +246,12 @@ public class SelectionCombineOperatorTest { combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC"); assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); - selectionResult = combineResult.getRowsAsPriorityQueue(); - assertNotNull(selectionResult); - assertEquals(selectionResult.size(), 10); - expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40; - while (!selectionResult.isEmpty()) { - assertEquals((int) selectionResult.poll()[0], expectedValue++); + rows = combineResult.getRows(); + assertNotNull(rows); + assertEquals(rows.size(), 10); + int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49; + for (int i = 0; i < 10; i++) { + assertEquals((int) rows.get(i)[0], expectedValue - i); } // Should early-terminate after processing the result of the first segment. Each thread should process at most 1 // segment. @@ -272,9 +270,9 @@ public class SelectionCombineOperatorTest { combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000"); assertEquals(combineResult.getDataSchema(), new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); - selectionResult = combineResult.getRowsAsPriorityQueue(); - assertNotNull(selectionResult); - assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + rows = combineResult.getRows(); + assertNotNull(rows); + assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); // Should not early-terminate numDocsScanned = combineResult.getNumDocsScanned(); assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java index b7e2a2b6de..b7ed828aa2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java @@ -19,9 +19,7 @@ package org.apache.pinot.core.operator.query; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.PartiallySortedListBuilder; import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.TotallySortedListBuilder; import org.testng.annotations.Test; @@ -67,7 +65,7 @@ public class LinearSelectionOrderByOperatorTest { public void testPartiallySortedListBuilder() { int maxNumRows = 10; Comparator<Object[]> partitionComparator = Comparator.comparingInt(row -> (Integer) row[0]); - Comparator<Object[]> unsortedComparator = (row1, row2) -> Integer.compare((Integer) row2[1], (Integer) row1[1]); + Comparator<Object[]> unsortedComparator = Comparator.comparingInt(row -> (Integer) row[1]); // Enough rows collected without tie rows PartiallySortedListBuilder listBuilder = @@ -81,7 +79,7 @@ public class LinearSelectionOrderByOperatorTest { List<Object[]> rows = listBuilder.build(); assertEquals(rows.size(), maxNumRows); for (int i = 0; i < maxNumRows; i++) { - assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i}); + assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1}); } // Enough rows collected with tie rows @@ -98,19 +96,13 @@ public class LinearSelectionOrderByOperatorTest { rows = listBuilder.build(); assertEquals(rows.size(), maxNumRows); // For the last partition, should contain unsorted value 0 and 1 - Set<Integer> unsortedValues = new HashSet<>(); for (int i = 0; i < maxNumRows; i++) { if (i / 2 != lastPartitionValue) { - assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i}); + assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1}); } else { - Object[] row = rows.get(i); - assertEquals(row[0], lastPartitionValue); - int unsortedValue = (int) row[1]; - assertTrue(unsortedValue == 0 || unsortedValue == 1); - unsortedValues.add(unsortedValue); + assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2}); } } - assertEquals(unsortedValues.size(), 2); // Not enough rows collected with tie rows listBuilder = new PartiallySortedListBuilder(maxNumRows, partitionComparator, unsortedComparator); @@ -125,18 +117,12 @@ public class LinearSelectionOrderByOperatorTest { rows = listBuilder.build(); assertEquals(rows.size(), maxNumRows); // For the last partition, should contain unsorted value 0 and 1 - unsortedValues = new HashSet<>(); for (int i = 0; i < maxNumRows; i++) { if (i / 2 != lastPartitionValue) { - assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i}); + assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1}); } else { - Object[] row = rows.get(i); - assertEquals(row[0], lastPartitionValue); - int unsortedValue = (int) row[1]; - assertTrue(unsortedValue == 0 || unsortedValue == 1); - unsortedValues.add(unsortedValue); + assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2}); } } - assertEquals(unsortedValues.size(), 2); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java index df95b6a5e1..cdae8a37e6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java @@ -22,12 +22,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.PriorityQueue; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.spi.IndexSegment; @@ -38,6 +39,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -168,13 +170,15 @@ public class SelectionOperatorServiceTest { @Test public void testCompatibleRowsMergeWithoutOrdering() { - ArrayList<Object[]> mergedRows = new ArrayList<>(2); + List<Object[]> mergedRows = new ArrayList<>(2); mergedRows.add(_row1); mergedRows.add(_row2); - Collection<Object[]> rowsToMerge = new ArrayList<>(2); + SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, mergedRows); + List<Object[]> rowsToMerge = new ArrayList<>(2); rowsToMerge.add(_compatibleRow1); rowsToMerge.add(_compatibleRow2); - SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, 3); + SelectionResultsBlock blockToMerge = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge); + SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, 3); assertEquals(mergedRows.size(), 3); assertSame(mergedRows.get(0), _row1); assertSame(mergedRows.get(1), _row2); @@ -183,21 +187,23 @@ public class SelectionOperatorServiceTest { @Test public void testCompatibleRowsMergeWithOrdering() { - SelectionOperatorService selectionOperatorService = new SelectionOperatorService(_queryContext, _dataSchema); - PriorityQueue<Object[]> mergedRows = selectionOperatorService.getRows(); + assertNotNull(_queryContext.getOrderByExpressions()); + Comparator<Object[]> comparator = + SelectionOperatorUtils.getTypeCompatibleComparator(_queryContext.getOrderByExpressions(), _dataSchema, + _queryContext.isNullHandlingEnabled()).reversed(); int maxNumRows = _queryContext.getOffset() + _queryContext.getLimit(); - Collection<Object[]> rowsToMerge1 = new ArrayList<>(2); - rowsToMerge1.add(_row1); - rowsToMerge1.add(_row2); - SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge1, maxNumRows); - Collection<Object[]> rowsToMerge2 = new ArrayList<>(2); - rowsToMerge2.add(_compatibleRow1); - rowsToMerge2.add(_compatibleRow2); - SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge2, maxNumRows); + SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, Collections.emptyList(), comparator); + List<Object[]> rowsToMerge1 = Arrays.asList(_row2, _row1); + SelectionResultsBlock blockToMerge1 = new SelectionResultsBlock(_dataSchema, rowsToMerge1, comparator); + SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge1, maxNumRows); + List<Object[]> rowsToMerge2 = Arrays.asList(_compatibleRow2, _compatibleRow1); + SelectionResultsBlock blockToMerge2 = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge2, comparator); + SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge2, maxNumRows); + List<Object[]> mergedRows = mergedBlock.getRows(); assertEquals(mergedRows.size(), 3); - assertSame(mergedRows.poll(), _compatibleRow1); - assertSame(mergedRows.poll(), _row2); - assertSame(mergedRows.poll(), _compatibleRow2); + assertSame(mergedRows.get(0), _compatibleRow2); + assertSame(mergedRows.get(1), _row2); + assertSame(mergedRows.get(2), _compatibleRow1); } @Test diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java index 876baf7a98..a2c62d888d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java @@ -21,7 +21,6 @@ package org.apache.pinot.queries; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.ExecutionStatistics; @@ -175,7 +174,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY); - selectionResult = (List<Object[]>) resultsBlock.getRows(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); firstRow = selectionResult.get(0); assertEquals(firstRow.length, 3); @@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY); - PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ"); assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252}); @@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY); - selectionResult = resultsBlock.getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ"); assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647}); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java index 9e3abdc929..c28e8d9d9a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java @@ -21,7 +21,6 @@ package org.apache.pinot.queries; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.ExecutionStatistics; @@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY); - PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ"); assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252}); @@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY); - selectionResult = resultsBlock.getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ"); assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647}); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java index 9506bcbb9f..92d24fc72c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java @@ -21,7 +21,6 @@ package org.apache.pinot.queries; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -108,7 +107,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("daysSinceEpoch")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("daysSinceEpoch")), ColumnDataType.INT); - PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); for (Object[] row : selectionResult) { assertEquals(row.length, 1); @@ -232,9 +231,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515); assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595); @@ -256,9 +255,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - selectionResult = resultsBlock.getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 4); assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515); assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197); @@ -281,9 +280,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING}); - PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 3); assertEquals(lastRow[0], "gFuH"); @@ -302,9 +301,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 3); assertEquals(lastRow[0], "gFuH"); @@ -322,9 +321,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue dataSchema = resultsBlock.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 2); assertEquals(lastRow[0], "gFuH"); assertEquals(lastRow[1], 126164076); @@ -343,9 +342,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue dataSchema = resultsBlock.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 2); assertEquals(lastRow[0], "gFuH"); assertEquals(lastRow[1], 167572854); @@ -364,9 +363,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue dataSchema = resultsBlock.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 2); assertEquals(lastRow[0], "gFuH"); assertEquals(lastRow[1], 167572854); @@ -386,9 +385,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 3); assertEquals(lastRow[0], "gFuH"); // Unsorted column values should be the same as ordering by their own @@ -410,9 +409,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"}); assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT}); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 3); assertEquals(lastRow[0], "gFuH"); // Unsorted column values should be the same as ordering by their own @@ -440,9 +439,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 11); assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515); assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595); @@ -465,9 +464,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - selectionResult = resultsBlock.getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 11); assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515); assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197); @@ -493,9 +492,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(selectionDataSchema.size(), 11); assertTrue(columnIndexMap.containsKey("column5")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING); - PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(9); assertEquals(lastRow.length, 11); assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH"); @@ -515,9 +514,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertEquals(selectionDataSchema.size(), 11); assertTrue(columnIndexMap.containsKey("column5")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING); - selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 10); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(9); assertEquals(lastRow.length, 11); assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH"); } @@ -544,9 +543,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue(); + List<Object[]> selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 12000); - Object[] lastRow = selectionResult.peek(); + Object[] lastRow = selectionResult.get(11999); assertEquals(lastRow.length, 11); assertEquals((int) lastRow[columnIndexMap.get("column6")], 296467636); assertEquals((int) lastRow[columnIndexMap.get("column1")], 1715964282); @@ -569,9 +568,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue assertTrue(columnIndexMap.containsKey("column1")); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT); assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT); - selectionResult = resultsBlock.getRowsAsPriorityQueue(); + selectionResult = resultsBlock.getRows(); assertEquals(selectionResult.size(), 6129); - lastRow = selectionResult.peek(); + lastRow = selectionResult.get(6128); assertEquals(lastRow.length, 11); assertEquals((int) lastRow[columnIndexMap.get("column6")], 499968041); assertEquals((int) lastRow[columnIndexMap.get("column1")], 335520083); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org