This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0d8432efbf [SSE] Remove redundant sorting when broker receives sorted block (#16195) 0d8432efbf is described below commit 0d8432efbfa59e6a7d90c01b2ffc0db2e595f131 Author: Song Fu <131259315+songw...@users.noreply.github.com> AuthorDate: Mon Jun 30 17:06:05 2025 -0700 [SSE] Remove redundant sorting when broker receives sorted block (#16195) --- .../apache/pinot/common/datatable/DataTable.java | 8 +- .../pinot/common/datatable/DataTableImplV4.java | 5 + .../pinot/common/datatable/DataTableUtils.java | 11 + .../blocks/results/SelectionResultsBlock.java | 13 + .../query/reduce/SelectionDataTableReducer.java | 3 +- .../query/selection/SelectionOperatorService.java | 251 ++++++- .../core/query/selection/SelectionOrderByTest.java | 735 +++++++++++++++++++++ .../pinot/query/runtime/operator/LeafOperator.java | 2 + 8 files changed, 987 insertions(+), 41 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index 6b0d70f657..bd39b76fd9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -93,7 +93,7 @@ public interface DataTable { DataTable toDataOnlyDataTable(); enum MetadataValueType { - INT, LONG, STRING + INT, LONG, STRING, BOOLEAN } /* The MetadataKey is used since V3, where we present metadata as Map<MetadataKey, String> @@ -144,11 +144,13 @@ public interface DataTable { MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", MetadataValueType.STRING), NUM_GROUPS_WARNING_LIMIT_REACHED(35, "numGroupsWarningLimitReached", MetadataValueType.STRING), THREAD_MEM_ALLOCATED_BYTES(36, "threadMemAllocatedBytes", MetadataValueType.LONG), - RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", MetadataValueType.LONG); + RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", MetadataValueType.LONG), + // NOTE: for server after release 1.3.0 this flag is always set to true since servers now perform sorting + SORTED(38, "sorted", MetadataValueType.BOOLEAN); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = 37; + private static final int MAX_ID = 38; private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index f709f79a72..1a4aa9f503 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -549,6 +549,8 @@ public class DataTableImplV4 implements DataTable { dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value))); } else if (key.getValueType() == MetadataValueType.LONG) { dataOutputStream.write(Longs.toByteArray(Long.parseLong(value))); + } else if (key.getValueType() == MetadataValueType.BOOLEAN) { + dataOutputStream.write(DataTableUtils.encodeBoolean(Boolean.parseBoolean(value))); } else { byte[] valueBytes = value.getBytes(UTF_8); dataOutputStream.writeInt(valueBytes.length); @@ -586,6 +588,9 @@ public class DataTableImplV4 implements DataTable { } else if (key.getValueType() == MetadataValueType.LONG) { String value = "" + buffer.getLong(); metadata.put(key.getName(), value); + } else if (key.getValueType() == MetadataValueType.BOOLEAN) { + String value = "" + DataTableUtils.decodeBoolean(buffer); + metadata.put(key.getName(), value); } else { String value = DataTableUtils.decodeString(buffer); metadata.put(key.getName(), value); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java index dc3b40bfd8..bdfe442c16 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java @@ -64,6 +64,17 @@ public class DataTableUtils { return rowSizeInBytes; } + // return boolean as a byte to be sent over Bytebuffer + public static byte[] encodeBoolean(boolean b) { + return new byte[]{(byte) (b ? 1 : 0)}; + } + + public static boolean decodeBoolean(ByteBuffer buffer) + throws IOException { + byte b = buffer.get(); + return (int) b == 1; + } + /** * Helper method to decode string. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java index 665d03a029..37401aeab2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.blocks.results; import java.io.IOException; import java.util.Comparator; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; @@ -83,4 +84,16 @@ public class SelectionResultsBlock extends BaseResultsBlock { throws IOException { return SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, _queryContext.isNullHandlingEnabled()); } + + // provide sorted metadata + @Override + public Map<String, String> getResultsMetadata() { + Map<String, String> metadata = super.getResultsMetadata(); + // All selection result blocks created by operators with orderBy + // come with non-null comparator + if (_comparator != null) { + metadata.put(DataTable.MetadataKey.SORTED.getName(), "true"); + } + return metadata; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java index a8226aee5e..6612556dd1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java @@ -67,8 +67,7 @@ public class SelectionDataTableReducer implements DataTableReducer { // Selection order-by SelectionOperatorService selectionService = new SelectionOperatorService(_queryContext, pair.getLeft(), pair.getRight()); - selectionService.reduceWithOrdering(dataTableMap.values()); - brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering()); + brokerResponseNative.setResultTable(selectionService.reduceWithOrdering(dataTableMap.values())); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java index 7db96f5224..c21cc0ddd2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java @@ -18,9 +18,14 @@ */ package org.apache.pinot.core.query.selection; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; +import java.util.List; import java.util.PriorityQueue; +import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -31,6 +36,7 @@ import org.roaringbitmap.RoaringBitmap; /** + * * The <code>SelectionOperatorService</code> class provides the services for selection queries with * <code>ORDER BY</code>. * <p>Expected behavior: @@ -60,8 +66,20 @@ public class SelectionOperatorService { private final DataSchema _dataSchema; private final int[] _columnIndices; private final int _offset; + private final int _limit; private final int _numRowsToKeep; - private final PriorityQueue<Object[]> _rows; + // TODO: consider moving this to a util class + + /** Util class used for n-way merge */ + private static class MergeItem { + final Object[] _row; + final int _dataTableId; + + MergeItem(Object[] row, int dataTableId) { + _row = row; + _dataTableId = dataTableId; + } + } public SelectionOperatorService(QueryContext queryContext, DataSchema dataSchema, int[] columnIndices) { _queryContext = queryContext; @@ -69,65 +87,226 @@ public class SelectionOperatorService { _columnIndices = columnIndices; // Select rows from offset to offset + limit. _offset = queryContext.getOffset(); - _numRowsToKeep = _offset + queryContext.getLimit(); + _limit = queryContext.getLimit(); + _numRowsToKeep = _offset + _limit; assert queryContext.getOrderByExpressions() != null; - _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), - OrderByComparatorFactory.getComparator(queryContext.getOrderByExpressions(), - _queryContext.isNullHandlingEnabled()).reversed()); } /** - * Reduces a collection of {@link DataTable}s to selection rows for selection queries with <code>ORDER BY</code>. - * TODO: Do merge sort after releasing 0.13.0 when server side results are sorted - * Can also consider adding a data table metadata to indicate whether the server side results are sorted + * Reduce multiple sorted (and unsorted) dataTables into a single resultTable, ordered, limited, and offset + * @param dataTables dataTables to be reduced + * @return resultTable + */ + public ResultTable reduceWithOrdering(Collection<DataTable> dataTables) { + boolean allSorted = true; + for (DataTable dataTable : dataTables) { + String sorted = dataTable.getMetadata().get(DataTable.MetadataKey.SORTED.getName()); + if (!Boolean.parseBoolean(sorted)) { + allSorted = false; + break; + } + } + + // TODO: backward compatible, to be removed after 1.2.0 no longer supported + if (!allSorted) { + List<Object[]> heapSortedRows = heapSortDataTable(dataTables); + return new ResultTable(_dataSchema, heapSortedRows); + } // end todo + + if (dataTables.size() == 1) { + // short circuit single table case + DataTable dataTable = dataTables.iterator().next(); + List<Object[]> resultRows = processSingleDataTable(dataTable); + return new ResultTable(_dataSchema, resultRows); + } + + // n-way merge sorted dataTable, we need to access dataTable by index + List<DataTable> dataTableList = new ArrayList<>(dataTables); + List<Object[]> mergedRows = nWayMergeDataTables(dataTableList); + return new ResultTable(_dataSchema, mergedRows); + } + + /** + * Merge sorted dataTables using N-way merge + * @param dataTables sorted dataTables + * @return sorted rows + */ + private List<Object[]> nWayMergeDataTables(List<DataTable> dataTables) { + Comparator<Object[]> comparator = OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(), + _queryContext.isNullHandlingEnabled()); + Comparator<MergeItem> mergeItemComparator = (MergeItem o1, MergeItem o2) -> comparator.compare(o1._row, o2._row); + PriorityQueue<MergeItem> mergeSortRows = + new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), + mergeItemComparator); + + // populate pq + int numDataTables = dataTables.size(); + int[] nextRowIds = new int[numDataTables]; + int[] numRows = new int[numDataTables]; + RoaringBitmap[][] dataTableNullBitmaps = getdataTableNullBitmaps(dataTables); + for (int i = 0; i < numDataTables; i++) { + DataTable dataTable = dataTables.get(i); + numRows[i] = dataTable.getNumberOfRows(); + if (numRows[i] > 0) { + Object[] row = getDataTableRow(dataTable, 0, dataTableNullBitmaps[i]); + MergeItem mergeItem = new MergeItem(row, i); + mergeSortRows.add(mergeItem); + nextRowIds[i] = 1; + } + } + + // merge + List<Object[]> resultRows = new ArrayList<>(); + DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes(); + int numColumns = columnDataTypes.length; + int offsetCounter = _offset; + while (resultRows.size() < _limit && !mergeSortRows.isEmpty()) { + MergeItem item = mergeSortRows.poll(); + if (offsetCounter > 0) { + offsetCounter--; + } else { + Object[] row = item._row; + Object[] resultRow = formatRow(numColumns, row, columnDataTypes); + resultRows.add(resultRow); + } + int dataTableId = item._dataTableId; + int nextRowId = nextRowIds[dataTableId]++; + if (nextRowId >= numRows[dataTableId]) { + continue; + } + DataTable dataTable = dataTables.get(dataTableId); + Object[] row = getDataTableRow(dataTable, nextRowId, dataTableNullBitmaps[dataTableId]); + MergeItem mergeItem = new MergeItem(row, dataTableId); + mergeSortRows.add(mergeItem); + } + return resultRows; + } + + private List<Object[]> processSingleDataTable(DataTable dataTable) { + List<Object[]> resultRows = new ArrayList<>(); + DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes(); + int numColumns = _dataSchema.size(); + int numRows = dataTable.getNumberOfRows(); + + if (numRows <= _offset) { + return Collections.emptyList(); + } + + int start = _offset; + int end = Math.min(numRows, _offset + _limit); + + if (_queryContext.isNullHandlingEnabled()) { + RoaringBitmap[] nullBitmaps = getNullBitmap(dataTable); + for (int rowId = start; rowId < end; rowId++) { + Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + setNullsForRow(nullBitmaps, rowId, row); + Object[] resultRow = formatRow(numColumns, row, columnDataTypes); + resultRows.add(resultRow); + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId); + } + } else { + for (int rowId = start; rowId < end; rowId++) { + Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + Object[] resultRow = formatRow(numColumns, row, columnDataTypes); + resultRows.add(resultRow); + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId); + } + } + return resultRows; + } + + /** get nullBitmaps for dataTables */ + private RoaringBitmap[][] getdataTableNullBitmaps(List<DataTable> dataTables) { + RoaringBitmap[][] dataTableNullBitmaps = new RoaringBitmap[dataTables.size()][]; + if (!_queryContext.isNullHandlingEnabled()) { + return dataTableNullBitmaps; + } + int idx = 0; + for (DataTable dataTable : dataTables) { + dataTableNullBitmaps[idx++] = getNullBitmap(dataTable); + } + return dataTableNullBitmaps; + } + + /** get a single row from dataTable with null handling if nullBitmaps provided */ + private Object[] getDataTableRow(DataTable dataTable, int rowId, @Nullable RoaringBitmap[] nullBitmaps) { + Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + if (nullBitmaps != null) { + setNullsForRow(nullBitmaps, rowId, row); + Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId); + } + return row; + } + + private static void setNullsForRow(RoaringBitmap[] nullBitmaps, int rowId, Object[] row) { + for (int colId = 0; colId < nullBitmaps.length; colId++) { + if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) { + row[colId] = null; + } + } + } + + private static RoaringBitmap[] getNullBitmap(DataTable dataTable) { + RoaringBitmap[] nullBitmaps = new RoaringBitmap[dataTable.getDataSchema().size()]; + for (int colId = 0; colId < nullBitmaps.length; colId++) { + nullBitmaps[colId] = dataTable.getNullRowIds(colId); + } + return nullBitmaps; + } + + private Object[] formatRow(int numColumns, Object[] row, DataSchema.ColumnDataType[] columnDataTypes) { + Object[] resultRow = new Object[numColumns]; + for (int i = 0; i < numColumns; i++) { + Object value = row[_columnIndices[i]]; + if (value != null) { + resultRow[i] = columnDataTypes[i].convertAndFormat(value); + } + } + return resultRow; + } + + /** + * Heapsort unsorted dataTables, this code is unreachable for + * server version >= 1.3.0 as server always performs sorting + * TODO: remove after 1.2.0 no longer supported + * @param dataTables unsorted dataTables + * @return sorted rows */ - public void reduceWithOrdering(Collection<DataTable> dataTables) { + private List<Object[]> heapSortDataTable(Collection<DataTable> dataTables) { + Comparator<Object[]> comparator = OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(), + _queryContext.isNullHandlingEnabled()); + PriorityQueue<Object[]> heapSortRows = + new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY), + comparator.reversed()); + // reduce for (DataTable dataTable : dataTables) { int numRows = dataTable.getNumberOfRows(); if (_queryContext.isNullHandlingEnabled()) { - RoaringBitmap[] nullBitmaps = new RoaringBitmap[dataTable.getDataSchema().size()]; - for (int colId = 0; colId < nullBitmaps.length; colId++) { - nullBitmaps[colId] = dataTable.getNullRowIds(colId); - } + RoaringBitmap[] nullBitmaps = getNullBitmap(dataTable); for (int rowId = 0; rowId < numRows; rowId++) { Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); - for (int colId = 0; colId < nullBitmaps.length; colId++) { - if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) { - row[colId] = null; - } - } - SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep); + setNullsForRow(nullBitmaps, rowId, row); + SelectionOperatorUtils.addToPriorityQueue(row, heapSortRows, _numRowsToKeep); Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId); } } else { for (int rowId = 0; rowId < numRows; rowId++) { Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); - SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep); + SelectionOperatorUtils.addToPriorityQueue(row, heapSortRows, _numRowsToKeep); Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId); } } } - } - - /** - * Renders the selection rows to a {@link ResultTable} object for selection queries with <code>ORDER BY</code>. - */ - public ResultTable renderResultTableWithOrdering() { + // render LinkedList<Object[]> resultRows = new LinkedList<>(); DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes(); int numColumns = columnDataTypes.length; - while (_rows.size() > _offset) { - Object[] row = _rows.poll(); + while (heapSortRows.size() > _offset) { + Object[] row = heapSortRows.poll(); assert row != null; - Object[] resultRow = new Object[numColumns]; - for (int i = 0; i < numColumns; i++) { - Object value = row[_columnIndices[i]]; - if (value != null) { - resultRow[i] = columnDataTypes[i].convertAndFormat(value); - } - } + Object[] resultRow = formatRow(numColumns, row, columnDataTypes); resultRows.addFirst(resultRow); } - return new ResultTable(_dataSchema, resultRows); + return resultRows; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java new file mode 100644 index 0000000000..618fba40e9 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java @@ -0,0 +1,735 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.query.selection; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.queries.FluentQueryTest; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; + + +/** query-based tests for selection-orderby */ +public class SelectionOrderByTest { + + @Test + public void testSingleTable() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable ORDER BY col1"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + // sorted block + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 1); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 2); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 3); + dataTableBuilder.finishRow(); + + DataTable dataTable = dataTableBuilder.build(); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 3); + assertEquals(rows.get(0), new Object[]{1}); + assertEquals(rows.get(1), new Object[]{2}); + assertEquals(rows.get(2), new Object[]{3}); + } + + @Test + public void testSingleTableLimitOffsetSmallerThanResultSize() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable ORDER BY col1 LIMIT 1 OFFSET 1"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + // sorted block + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 1); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 2); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 3); + dataTableBuilder.finishRow(); + + DataTable dataTable = dataTableBuilder.build(); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 1); + assertEquals(rows.get(0), new Object[]{2}); + } + + @Test + public void testSingleTableLimitOffsetLargerThanResultSize() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable ORDER BY col1 LIMIT 3 OFFSET 1"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + // sorted block + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 1); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 2); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 3); + dataTableBuilder.finishRow(); + + DataTable dataTable = dataTableBuilder.build(); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 2); + assertEquals(rows.get(0), new Object[]{2}); + assertEquals(rows.get(1), new Object[]{3}); + } + + @Test + public void testSingleTableOffsetLargerThanResultSize() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable ORDER BY col1 LIMIT 1 OFFSET 4"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); + // sorted block + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 1); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 2); + dataTableBuilder.finishRow(); + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, 3); + dataTableBuilder.finishRow(); + + DataTable dataTable = dataTableBuilder.build(); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 0); + } + + @Test + public void testSingleTableWithNull() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable ORDER BY col1"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + + List<Object[]> unsortedRows = new ArrayList<>(); + unsortedRows.add(new Object[]{null}); + unsortedRows.add(new Object[]{2}); + unsortedRows.add(new Object[]{3}); + + DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(unsortedRows, dataSchema, true); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 3); + assertEquals(rows.get(0), new Object[]{0}); + assertEquals(rows.get(1), new Object[]{2}); + assertEquals(rows.get(2), new Object[]{3}); + } + + @Test + public void testSingleTableWithNullHandlingEnabled() + throws IOException { + BrokerReduceService brokerReduceService = + new BrokerReduceService( + new PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2))); + BrokerRequest brokerRequest = + CalciteSqlCompiler.compileToBrokerRequest( + "SET enableNullHandling=true; SELECT col1 FROM testTable ORDER BY col1"); + DataSchema dataSchema = + new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}); + + List<Object[]> unsortedRows = new ArrayList<>(); + unsortedRows.add(new Object[]{1}); + unsortedRows.add(new Object[]{2}); + unsortedRows.add(new Object[]{null}); + + DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(unsortedRows, dataSchema, true); + dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), "true"); + Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); + int numSortedInstances = 1; + for (int i = 0; i < numSortedInstances; i++) { + ServerRoutingInstance instance = new ServerRoutingInstance("localhost", i, TableType.OFFLINE); + dataTableMap.put(instance, dataTable); + } + long reduceTimeoutMs = 100000; + BrokerResponseNative brokerResponse = + brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, dataTableMap, reduceTimeoutMs, + mock(BrokerMetrics.class)); + brokerReduceService.shutDown(); + + ResultTable resultTable = brokerResponse.getResultTable(); + List<Object[]> rows = resultTable.getRows(); + assertEquals(rows.size(), 3); + assertEquals(rows.get(0), new Object[]{1}); + assertEquals(rows.get(1), new Object[]{2}); + assertEquals(rows.get(2), new Object[]{null}); + } + + @Test + public void list() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2}, + new Object[]{null} + ) + .whenQuery("select myField from testTable order by myField") + .thenResultIs("INTEGER", + "-2147483648", + "1", + "2", + "3" + ); + } + + @Test + public void listNullHandlingEnabled() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2}, + new Object[]{null} + ) + .whenQuery("select myField from testTable order by myField") + .thenResultIs("INTEGER", + "1", + "2", + "3", + "null" + ); + } + + @Test + public void listTwoFields() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, null} + ) + .whenQuery("select field1, field2 from testTable2 order by field1") + .thenResultIs("INTEGER|INTEGER", + "-2147483648|-2147483648", + "1|5", + "2|3", + "3|4" + ); + } + + @Test + public void listTwoFieldsNullHandlingEnabled() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1") + .thenResultIs("INTEGER|INTEGER", + "1|5", + "2|3", + "3|4", + "null|2" + ); + } + + @Test + public void listTwoFieldsNullHandlingEnabledNullsFirst() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 nulls first") + .thenResultIs("INTEGER|INTEGER", + "null|2", + "1|5", + "2|3", + "3|4" + ); + } + + @Test + public void listTwoFieldsDesc() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, null} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 desc") + .thenResultIs("INTEGER|INTEGER", + "3|4", + "2|3", + "1|5", + "-2147483648|-2147483648" + ); + } + + @Test + public void listTwoFieldsNullHandlingEnabledDesc() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{4, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{3, 0}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 desc") + .thenResultIs("INTEGER|INTEGER", + "null|2", + "4|4", + "3|0", + "2|3", + "1|5" + ); + } + + @Test + public void listTwoFieldsNullHandlingEnabledDescNullsLast() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{4, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{3, 0}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 desc nulls last") + .thenResultIs("INTEGER|INTEGER", + "4|4", + "3|0", + "2|3", + "1|5", + "null|2" + ); + } + + @Test + public void listSortonTwoFields() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4}, + new Object[]{2, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1, field2") + .thenResultIs("INTEGER|INTEGER", + "-2147483648|2", + "1|5", + "2|3", + "2|4", + "3|4" + ); + } + + @Test + public void listSortonTwoFieldsNullHandlingEnabled() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4}, + new Object[]{2, 4}, + new Object[]{null, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1, field2") + .thenResultIs("INTEGER|INTEGER", + "1|5", + "2|3", + "2|4", + "3|4", + "null|2", + "null|4" + ); + } + + @Test + public void listSortonTwoFieldsOneDesc() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4}, + new Object[]{2, 4}, + new Object[]{null, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 desc, field2") + .thenResultIs("INTEGER|INTEGER", + "3|4", + "2|3", + "2|4", + "1|5", + "-2147483648|2", + "-2147483648|4" + ); + } + + @Test + public void listSortonTwoFieldsNullHandlingEnabledOneDesc() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(true) + .givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), TWO_FIELDS_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1, 5}, + new Object[]{3, 4}, + new Object[]{2, 4}, + new Object[]{null, 4} + ) + .andOnSecondInstance( + new Object[]{2, 3}, + new Object[]{null, 2} + ) + .whenQuery("select field1, field2 from testTable2 order by field1 desc, field2") + .thenResultIs("INTEGER|INTEGER", + "null|2", + "null|4", + "3|4", + "2|3", + "2|4", + "1|5" + ); + } + + @Test + public void listOffset() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2}, + new Object[]{null} + ) + .whenQuery("select myField from testTable order by myField offset 1") + .thenResultIs("INTEGER", + "1", + "2", + "3" + ); + } + + @Test + public void listOffsetLimit() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2}, + new Object[]{null} + ) + .whenQuery("select myField from testTable order by myField offset 1 limit 2") + .thenResultIs("INTEGER", + "1", + "2" + ); + } + + @Test + public void listOffsetLimitSmallerThanResultSize() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2} + ) + .whenQuery("select myField from testTable order by myField offset 1 limit 3") + .thenResultIs("INTEGER", + "2", + "3" + ); + } + + @Test + public void listOffsetLargerThanResult() { + FluentQueryTest.withBaseDir(_baseDir) + .withNullHandling(false) + .givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG) + .onFirstInstance( + new Object[]{1}, + new Object[]{3} + ) + .andOnSecondInstance( + new Object[]{2}, + new Object[]{null} + ) + .whenQuery("select myField from testTable order by myField offset 10") + .thenResultIs("INTEGER" + ); + } + + // utils --- + + @DataProvider(name = "nullHandlingEnabled") + public Object[][] nullHandlingEnabled() { + return new Object[][]{ + {false}, {true} + }; + } + + private static final FieldSpec.DataType[] VALID_DATA_TYPES = new FieldSpec.DataType[]{ + FieldSpec.DataType.INT, + FieldSpec.DataType.LONG, + FieldSpec.DataType.FLOAT, + FieldSpec.DataType.DOUBLE, + FieldSpec.DataType.STRING, + FieldSpec.DataType.BYTES, + FieldSpec.DataType.BIG_DECIMAL, + FieldSpec.DataType.TIMESTAMP, + FieldSpec.DataType.BOOLEAN + }; + + protected static final Map<FieldSpec.DataType, Schema> SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS = + Arrays.stream(VALID_DATA_TYPES) + .collect(Collectors.toMap(dt -> dt, dt -> new Schema.SchemaBuilder() + .setSchemaName("testTable") + .setEnableColumnBasedNullHandling(true) + .addDimensionField("myField", dt, f -> f.setNullable(true)) + .build())); + + protected static final Map<FieldSpec.DataType, Schema> TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS = + Arrays.stream(VALID_DATA_TYPES) + .collect(Collectors.toMap(dt -> dt, dt -> new Schema.SchemaBuilder() + .setSchemaName("testTable2") + .setEnableColumnBasedNullHandling(true) + .addDimensionField("field1", dt, f -> f.setNullable(true)) + .addDimensionField("field2", dt, f -> f.setNullable(true)) + .build())); + + protected static final TableConfig SINGLE_FIELD_TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE) + .setTableName("testTable") + .build(); + + protected static final TableConfig TWO_FIELDS_TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE) + .setTableName("testTable") + .build(); + + protected File _baseDir; + + @BeforeClass + void createBaseDir() { + try { + _baseDir = Files.createTempDirectory(getClass().getSimpleName()).toFile(); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @AfterClass + void destroyBaseDir() + throws IOException { + if (_baseDir != null) { + FileUtils.deleteDirectory(_baseDir); + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 9a55c8aa65..3491f5c43b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -298,6 +298,8 @@ public class LeafOperator extends MultiStageOperator { case NUM_CONSUMING_SEGMENTS_MATCHED: _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, Integer.parseInt(entry.getValue())); break; + case SORTED: + break; default: { throw new IllegalArgumentException("Unhandled V1 execution stat: " + entry.getKey()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org