This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d8432efbf [SSE] Remove redundant sorting when broker receives sorted 
block (#16195)
0d8432efbf is described below

commit 0d8432efbfa59e6a7d90c01b2ffc0db2e595f131
Author: Song Fu <131259315+songw...@users.noreply.github.com>
AuthorDate: Mon Jun 30 17:06:05 2025 -0700

    [SSE] Remove redundant sorting when broker receives sorted block (#16195)
---
 .../apache/pinot/common/datatable/DataTable.java   |   8 +-
 .../pinot/common/datatable/DataTableImplV4.java    |   5 +
 .../pinot/common/datatable/DataTableUtils.java     |  11 +
 .../blocks/results/SelectionResultsBlock.java      |  13 +
 .../query/reduce/SelectionDataTableReducer.java    |   3 +-
 .../query/selection/SelectionOperatorService.java  | 251 ++++++-
 .../core/query/selection/SelectionOrderByTest.java | 735 +++++++++++++++++++++
 .../pinot/query/runtime/operator/LeafOperator.java |   2 +
 8 files changed, 987 insertions(+), 41 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 6b0d70f657..bd39b76fd9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -93,7 +93,7 @@ public interface DataTable {
   DataTable toDataOnlyDataTable();
 
   enum MetadataValueType {
-    INT, LONG, STRING
+    INT, LONG, STRING, BOOLEAN
   }
 
   /* The MetadataKey is used since V3, where we present metadata as 
Map<MetadataKey, String>
@@ -144,11 +144,13 @@ public interface DataTable {
     MAX_ROWS_IN_JOIN_REACHED(34, "maxRowsInJoinReached", 
MetadataValueType.STRING),
     NUM_GROUPS_WARNING_LIMIT_REACHED(35, "numGroupsWarningLimitReached", 
MetadataValueType.STRING),
     THREAD_MEM_ALLOCATED_BYTES(36, "threadMemAllocatedBytes", 
MetadataValueType.LONG),
-    RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", 
MetadataValueType.LONG);
+    RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", 
MetadataValueType.LONG),
+    // NOTE: for server after release 1.3.0 this flag is always set to true 
since servers now perform sorting
+    SORTED(38, "sorted", MetadataValueType.BOOLEAN);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = 37;
+    private static final int MAX_ID = 38;
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index f709f79a72..1a4aa9f503 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -549,6 +549,8 @@ public class DataTableImplV4 implements DataTable {
         dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
       } else if (key.getValueType() == MetadataValueType.LONG) {
         dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
+      } else if (key.getValueType() == MetadataValueType.BOOLEAN) {
+        
dataOutputStream.write(DataTableUtils.encodeBoolean(Boolean.parseBoolean(value)));
       } else {
         byte[] valueBytes = value.getBytes(UTF_8);
         dataOutputStream.writeInt(valueBytes.length);
@@ -586,6 +588,9 @@ public class DataTableImplV4 implements DataTable {
       } else if (key.getValueType() == MetadataValueType.LONG) {
         String value = "" + buffer.getLong();
         metadata.put(key.getName(), value);
+      } else if (key.getValueType() == MetadataValueType.BOOLEAN) {
+        String value = "" + DataTableUtils.decodeBoolean(buffer);
+        metadata.put(key.getName(), value);
       } else {
         String value = DataTableUtils.decodeString(buffer);
         metadata.put(key.getName(), value);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
index dc3b40bfd8..bdfe442c16 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
@@ -64,6 +64,17 @@ public class DataTableUtils {
     return rowSizeInBytes;
   }
 
+  // return boolean as a byte to be sent over Bytebuffer
+  public static byte[] encodeBoolean(boolean b) {
+    return new byte[]{(byte) (b ? 1 : 0)};
+  }
+
+  public static boolean decodeBoolean(ByteBuffer buffer)
+      throws IOException {
+    byte b = buffer.get();
+    return (int) b == 1;
+  }
+
   /**
    * Helper method to decode string.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 665d03a029..37401aeab2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.blocks.results;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -83,4 +84,16 @@ public class SelectionResultsBlock extends BaseResultsBlock {
       throws IOException {
     return SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, 
_queryContext.isNullHandlingEnabled());
   }
+
+  // provide sorted metadata
+  @Override
+  public Map<String, String> getResultsMetadata() {
+    Map<String, String> metadata = super.getResultsMetadata();
+    // All selection result blocks created by operators with orderBy
+    // come with non-null comparator
+    if (_comparator != null) {
+      metadata.put(DataTable.MetadataKey.SORTED.getName(), "true");
+    }
+    return metadata;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
index a8226aee5e..6612556dd1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -67,8 +67,7 @@ public class SelectionDataTableReducer implements 
DataTableReducer {
       // Selection order-by
       SelectionOperatorService selectionService =
           new SelectionOperatorService(_queryContext, pair.getLeft(), 
pair.getRight());
-      selectionService.reduceWithOrdering(dataTableMap.values());
-      
brokerResponseNative.setResultTable(selectionService.renderResultTableWithOrdering());
+      
brokerResponseNative.setResultTable(selectionService.reduceWithOrdering(dataTableMap.values()));
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 7db96f5224..c21cc0ddd2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.core.query.selection;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.PriorityQueue;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -31,6 +36,7 @@ import org.roaringbitmap.RoaringBitmap;
 
 
 /**
+ *
  * The <code>SelectionOperatorService</code> class provides the services for 
selection queries with
  * <code>ORDER BY</code>.
  * <p>Expected behavior:
@@ -60,8 +66,20 @@ public class SelectionOperatorService {
   private final DataSchema _dataSchema;
   private final int[] _columnIndices;
   private final int _offset;
+  private final int _limit;
   private final int _numRowsToKeep;
-  private final PriorityQueue<Object[]> _rows;
+  // TODO: consider moving this to a util class
+
+  /** Util class used for n-way merge */
+  private static class MergeItem {
+    final Object[] _row;
+    final int _dataTableId;
+
+    MergeItem(Object[] row, int dataTableId) {
+      _row = row;
+      _dataTableId = dataTableId;
+    }
+  }
 
   public SelectionOperatorService(QueryContext queryContext, DataSchema 
dataSchema, int[] columnIndices) {
     _queryContext = queryContext;
@@ -69,65 +87,226 @@ public class SelectionOperatorService {
     _columnIndices = columnIndices;
     // Select rows from offset to offset + limit.
     _offset = queryContext.getOffset();
-    _numRowsToKeep = _offset + queryContext.getLimit();
+    _limit = queryContext.getLimit();
+    _numRowsToKeep = _offset + _limit;
     assert queryContext.getOrderByExpressions() != null;
-    _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        
OrderByComparatorFactory.getComparator(queryContext.getOrderByExpressions(),
-            _queryContext.isNullHandlingEnabled()).reversed());
   }
 
   /**
-   * Reduces a collection of {@link DataTable}s to selection rows for 
selection queries with <code>ORDER BY</code>.
-   * TODO: Do merge sort after releasing 0.13.0 when server side results are 
sorted
-   *       Can also consider adding a data table metadata to indicate whether 
the server side results are sorted
+   * Reduce multiple sorted (and unsorted) dataTables into a single 
resultTable, ordered, limited, and offset
+   * @param dataTables dataTables to be reduced
+   * @return resultTable
+   */
+  public ResultTable reduceWithOrdering(Collection<DataTable> dataTables) {
+    boolean allSorted = true;
+    for (DataTable dataTable : dataTables) {
+      String sorted = 
dataTable.getMetadata().get(DataTable.MetadataKey.SORTED.getName());
+      if (!Boolean.parseBoolean(sorted)) {
+        allSorted = false;
+        break;
+      }
+    }
+
+    // TODO: backward compatible, to be removed after 1.2.0 no longer supported
+    if (!allSorted) {
+      List<Object[]> heapSortedRows = heapSortDataTable(dataTables);
+      return new ResultTable(_dataSchema, heapSortedRows);
+    } // end todo
+
+    if (dataTables.size() == 1) {
+      // short circuit single table case
+      DataTable dataTable = dataTables.iterator().next();
+      List<Object[]> resultRows = processSingleDataTable(dataTable);
+      return new ResultTable(_dataSchema, resultRows);
+    }
+
+    // n-way merge sorted dataTable, we need to access dataTable by index
+    List<DataTable> dataTableList = new ArrayList<>(dataTables);
+    List<Object[]> mergedRows = nWayMergeDataTables(dataTableList);
+    return new ResultTable(_dataSchema, mergedRows);
+  }
+
+  /**
+   * Merge sorted dataTables using N-way merge
+   * @param dataTables sorted dataTables
+   * @return sorted rows
+   */
+  private List<Object[]> nWayMergeDataTables(List<DataTable> dataTables) {
+    Comparator<Object[]> comparator = 
OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(),
+        _queryContext.isNullHandlingEnabled());
+    Comparator<MergeItem> mergeItemComparator = (MergeItem o1, MergeItem o2) 
-> comparator.compare(o1._row, o2._row);
+    PriorityQueue<MergeItem> mergeSortRows =
+        new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
+            mergeItemComparator);
+
+    // populate pq
+    int numDataTables = dataTables.size();
+    int[] nextRowIds = new int[numDataTables];
+    int[] numRows = new int[numDataTables];
+    RoaringBitmap[][] dataTableNullBitmaps = 
getdataTableNullBitmaps(dataTables);
+    for (int i = 0; i < numDataTables; i++) {
+      DataTable dataTable = dataTables.get(i);
+      numRows[i] = dataTable.getNumberOfRows();
+      if (numRows[i] > 0) {
+        Object[] row = getDataTableRow(dataTable, 0, dataTableNullBitmaps[i]);
+        MergeItem mergeItem = new MergeItem(row, i);
+        mergeSortRows.add(mergeItem);
+        nextRowIds[i] = 1;
+      }
+    }
+
+    // merge
+    List<Object[]> resultRows = new ArrayList<>();
+    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    int offsetCounter = _offset;
+    while (resultRows.size() < _limit && !mergeSortRows.isEmpty()) {
+      MergeItem item = mergeSortRows.poll();
+      if (offsetCounter > 0) {
+        offsetCounter--;
+      } else {
+        Object[] row = item._row;
+        Object[] resultRow = formatRow(numColumns, row, columnDataTypes);
+        resultRows.add(resultRow);
+      }
+      int dataTableId = item._dataTableId;
+      int nextRowId = nextRowIds[dataTableId]++;
+      if (nextRowId >= numRows[dataTableId]) {
+        continue;
+      }
+      DataTable dataTable = dataTables.get(dataTableId);
+      Object[] row = getDataTableRow(dataTable, nextRowId, 
dataTableNullBitmaps[dataTableId]);
+      MergeItem mergeItem = new MergeItem(row, dataTableId);
+      mergeSortRows.add(mergeItem);
+    }
+    return resultRows;
+  }
+
+  private List<Object[]> processSingleDataTable(DataTable dataTable) {
+    List<Object[]> resultRows = new ArrayList<>();
+    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
+    int numColumns = _dataSchema.size();
+    int numRows = dataTable.getNumberOfRows();
+
+    if (numRows <= _offset) {
+      return Collections.emptyList();
+    }
+
+    int start = _offset;
+    int end = Math.min(numRows, _offset + _limit);
+
+    if (_queryContext.isNullHandlingEnabled()) {
+      RoaringBitmap[] nullBitmaps = getNullBitmap(dataTable);
+      for (int rowId = start; rowId < end; rowId++) {
+        Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+        setNullsForRow(nullBitmaps, rowId, row);
+        Object[] resultRow = formatRow(numColumns, row, columnDataTypes);
+        resultRows.add(resultRow);
+        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
+      }
+    } else {
+      for (int rowId = start; rowId < end; rowId++) {
+        Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+        Object[] resultRow = formatRow(numColumns, row, columnDataTypes);
+        resultRows.add(resultRow);
+        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
+      }
+    }
+    return resultRows;
+  }
+
+  /** get nullBitmaps for dataTables */
+  private RoaringBitmap[][] getdataTableNullBitmaps(List<DataTable> 
dataTables) {
+    RoaringBitmap[][] dataTableNullBitmaps = new 
RoaringBitmap[dataTables.size()][];
+    if (!_queryContext.isNullHandlingEnabled()) {
+      return dataTableNullBitmaps;
+    }
+    int idx = 0;
+    for (DataTable dataTable : dataTables) {
+      dataTableNullBitmaps[idx++] = getNullBitmap(dataTable);
+    }
+    return dataTableNullBitmaps;
+  }
+
+  /** get a single row from dataTable with null handling if nullBitmaps 
provided */
+  private Object[] getDataTableRow(DataTable dataTable, int rowId, @Nullable 
RoaringBitmap[] nullBitmaps) {
+    Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, 
rowId);
+    if (nullBitmaps != null) {
+      setNullsForRow(nullBitmaps, rowId, row);
+      
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
+    }
+    return row;
+  }
+
+  private static void setNullsForRow(RoaringBitmap[] nullBitmaps, int rowId, 
Object[] row) {
+    for (int colId = 0; colId < nullBitmaps.length; colId++) {
+      if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
+        row[colId] = null;
+      }
+    }
+  }
+
+  private static RoaringBitmap[] getNullBitmap(DataTable dataTable) {
+    RoaringBitmap[] nullBitmaps = new 
RoaringBitmap[dataTable.getDataSchema().size()];
+    for (int colId = 0; colId < nullBitmaps.length; colId++) {
+      nullBitmaps[colId] = dataTable.getNullRowIds(colId);
+    }
+    return nullBitmaps;
+  }
+
+  private Object[] formatRow(int numColumns, Object[] row, 
DataSchema.ColumnDataType[] columnDataTypes) {
+    Object[] resultRow = new Object[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      Object value = row[_columnIndices[i]];
+      if (value != null) {
+        resultRow[i] = columnDataTypes[i].convertAndFormat(value);
+      }
+    }
+    return resultRow;
+  }
+
+  /**
+   * Heapsort unsorted dataTables, this code is unreachable for
+   * server version >= 1.3.0 as server always performs sorting
+   * TODO: remove after 1.2.0 no longer supported
+   * @param dataTables unsorted dataTables
+   * @return sorted rows
    */
-  public void reduceWithOrdering(Collection<DataTable> dataTables) {
+  private List<Object[]> heapSortDataTable(Collection<DataTable> dataTables) {
+    Comparator<Object[]> comparator = 
OrderByComparatorFactory.getComparator(_queryContext.getOrderByExpressions(),
+        _queryContext.isNullHandlingEnabled());
+    PriorityQueue<Object[]> heapSortRows =
+        new PriorityQueue<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
+            comparator.reversed());
+    // reduce
     for (DataTable dataTable : dataTables) {
       int numRows = dataTable.getNumberOfRows();
       if (_queryContext.isNullHandlingEnabled()) {
-        RoaringBitmap[] nullBitmaps = new 
RoaringBitmap[dataTable.getDataSchema().size()];
-        for (int colId = 0; colId < nullBitmaps.length; colId++) {
-          nullBitmaps[colId] = dataTable.getNullRowIds(colId);
-        }
+        RoaringBitmap[] nullBitmaps = getNullBitmap(dataTable);
         for (int rowId = 0; rowId < numRows; rowId++) {
           Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
-          for (int colId = 0; colId < nullBitmaps.length; colId++) {
-            if (nullBitmaps[colId] != null && 
nullBitmaps[colId].contains(rowId)) {
-              row[colId] = null;
-            }
-          }
-          SelectionOperatorUtils.addToPriorityQueue(row, _rows, 
_numRowsToKeep);
+          setNullsForRow(nullBitmaps, rowId, row);
+          SelectionOperatorUtils.addToPriorityQueue(row, heapSortRows, 
_numRowsToKeep);
           
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
         }
       } else {
         for (int rowId = 0; rowId < numRows; rowId++) {
           Object[] row = 
SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
-          SelectionOperatorUtils.addToPriorityQueue(row, _rows, 
_numRowsToKeep);
+          SelectionOperatorUtils.addToPriorityQueue(row, heapSortRows, 
_numRowsToKeep);
           
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
         }
       }
     }
-  }
-
-  /**
-   * Renders the selection rows to a {@link ResultTable} object for selection 
queries with <code>ORDER BY</code>.
-   */
-  public ResultTable renderResultTableWithOrdering() {
+    // render
     LinkedList<Object[]> resultRows = new LinkedList<>();
     DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
-    while (_rows.size() > _offset) {
-      Object[] row = _rows.poll();
+    while (heapSortRows.size() > _offset) {
+      Object[] row = heapSortRows.poll();
       assert row != null;
-      Object[] resultRow = new Object[numColumns];
-      for (int i = 0; i < numColumns; i++) {
-        Object value = row[_columnIndices[i]];
-        if (value != null) {
-          resultRow[i] = columnDataTypes[i].convertAndFormat(value);
-        }
-      }
+      Object[] resultRow = formatRow(numColumns, row, columnDataTypes);
       resultRows.addFirst(resultRow);
     }
-    return new ResultTable(_dataSchema, resultRows);
+    return resultRows;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java
new file mode 100644
index 0000000000..618fba40e9
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOrderByTest.java
@@ -0,0 +1,735 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.selection;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+
+/** query-based tests for selection-orderby */
+public class SelectionOrderByTest {
+
+  @Test
+  public void testSingleTable()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable 
ORDER BY col1");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    // sorted block
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 1);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 2);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 3);
+    dataTableBuilder.finishRow();
+
+    DataTable dataTable = dataTableBuilder.build();
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 3);
+    assertEquals(rows.get(0), new Object[]{1});
+    assertEquals(rows.get(1), new Object[]{2});
+    assertEquals(rows.get(2), new Object[]{3});
+  }
+
+  @Test
+  public void testSingleTableLimitOffsetSmallerThanResultSize()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable 
ORDER BY col1 LIMIT 1 OFFSET 1");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    // sorted block
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 1);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 2);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 3);
+    dataTableBuilder.finishRow();
+
+    DataTable dataTable = dataTableBuilder.build();
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    assertEquals(rows.get(0), new Object[]{2});
+  }
+
+  @Test
+  public void testSingleTableLimitOffsetLargerThanResultSize()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable 
ORDER BY col1 LIMIT 3 OFFSET 1");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    // sorted block
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 1);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 2);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 3);
+    dataTableBuilder.finishRow();
+
+    DataTable dataTable = dataTableBuilder.build();
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 2);
+    assertEquals(rows.get(0), new Object[]{2});
+    assertEquals(rows.get(1), new Object[]{3});
+  }
+
+  @Test
+  public void testSingleTableOffsetLargerThanResultSize()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable 
ORDER BY col1 LIMIT 1 OFFSET 4");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    // sorted block
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 1);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 2);
+    dataTableBuilder.finishRow();
+    dataTableBuilder.startRow();
+    dataTableBuilder.setColumn(0, 3);
+    dataTableBuilder.finishRow();
+
+    DataTable dataTable = dataTableBuilder.build();
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 0);
+  }
+
+  @Test
+  public void testSingleTableWithNull()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable 
ORDER BY col1");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+    List<Object[]> unsortedRows = new ArrayList<>();
+    unsortedRows.add(new Object[]{null});
+    unsortedRows.add(new Object[]{2});
+    unsortedRows.add(new Object[]{3});
+
+    DataTable dataTable = 
SelectionOperatorUtils.getDataTableFromRows(unsortedRows, dataSchema, true);
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 3);
+    assertEquals(rows.get(0), new Object[]{0});
+    assertEquals(rows.get(1), new Object[]{2});
+    assertEquals(rows.get(2), new Object[]{3});
+  }
+
+  @Test
+  public void testSingleTableWithNullHandlingEnabled()
+      throws IOException {
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(
+            new 
PinotConfiguration(Map.of(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
 2)));
+    BrokerRequest brokerRequest =
+        CalciteSqlCompiler.compileToBrokerRequest(
+            "SET enableNullHandling=true; SELECT col1 FROM testTable ORDER BY 
col1");
+    DataSchema dataSchema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+
+    List<Object[]> unsortedRows = new ArrayList<>();
+    unsortedRows.add(new Object[]{1});
+    unsortedRows.add(new Object[]{2});
+    unsortedRows.add(new Object[]{null});
+
+    DataTable dataTable = 
SelectionOperatorUtils.getDataTableFromRows(unsortedRows, dataSchema, true);
+    dataTable.getMetadata().put(DataTable.MetadataKey.SORTED.getName(), 
"true");
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    int numSortedInstances = 1;
+    for (int i = 0; i < numSortedInstances; i++) {
+      ServerRoutingInstance instance = new ServerRoutingInstance("localhost", 
i, TableType.OFFLINE);
+      dataTableMap.put(instance, dataTable);
+    }
+    long reduceTimeoutMs = 100000;
+    BrokerResponseNative brokerResponse =
+        brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
dataTableMap, reduceTimeoutMs,
+            mock(BrokerMetrics.class));
+    brokerReduceService.shutDown();
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 3);
+    assertEquals(rows.get(0), new Object[]{1});
+    assertEquals(rows.get(1), new Object[]{2});
+    assertEquals(rows.get(2), new Object[]{null});
+  }
+
+  @Test
+  public void list() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2},
+            new Object[]{null}
+        )
+        .whenQuery("select myField from testTable order by myField")
+        .thenResultIs("INTEGER",
+            "-2147483648",
+            "1",
+            "2",
+            "3"
+        );
+  }
+
+  @Test
+  public void listNullHandlingEnabled() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2},
+            new Object[]{null}
+        )
+        .whenQuery("select myField from testTable order by myField")
+        .thenResultIs("INTEGER",
+            "1",
+            "2",
+            "3",
+            "null"
+        );
+  }
+
+  @Test
+  public void listTwoFields() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, null}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1")
+        .thenResultIs("INTEGER|INTEGER",
+            "-2147483648|-2147483648",
+            "1|5",
+            "2|3",
+            "3|4"
+        );
+  }
+
+  @Test
+  public void listTwoFieldsNullHandlingEnabled() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1")
+        .thenResultIs("INTEGER|INTEGER",
+            "1|5",
+            "2|3",
+            "3|4",
+            "null|2"
+        );
+  }
+
+  @Test
+  public void listTwoFieldsNullHandlingEnabledNullsFirst() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 
nulls first")
+        .thenResultIs("INTEGER|INTEGER",
+            "null|2",
+            "1|5",
+            "2|3",
+            "3|4"
+        );
+  }
+
+  @Test
+  public void listTwoFieldsDesc() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, null}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 
desc")
+        .thenResultIs("INTEGER|INTEGER",
+            "3|4",
+            "2|3",
+            "1|5",
+            "-2147483648|-2147483648"
+        );
+  }
+
+  @Test
+  public void listTwoFieldsNullHandlingEnabledDesc() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{4, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{3, 0},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 
desc")
+        .thenResultIs("INTEGER|INTEGER",
+            "null|2",
+            "4|4",
+            "3|0",
+            "2|3",
+            "1|5"
+        );
+  }
+
+  @Test
+  public void listTwoFieldsNullHandlingEnabledDescNullsLast() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{4, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{3, 0},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 desc 
nulls last")
+        .thenResultIs("INTEGER|INTEGER",
+            "4|4",
+            "3|0",
+            "2|3",
+            "1|5",
+            "null|2"
+        );
+  }
+
+  @Test
+  public void listSortonTwoFields() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4},
+            new Object[]{2, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1, 
field2")
+        .thenResultIs("INTEGER|INTEGER",
+            "-2147483648|2",
+            "1|5",
+            "2|3",
+            "2|4",
+            "3|4"
+        );
+  }
+
+  @Test
+  public void listSortonTwoFieldsNullHandlingEnabled() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4},
+            new Object[]{2, 4},
+            new Object[]{null, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1, 
field2")
+        .thenResultIs("INTEGER|INTEGER",
+            "1|5",
+            "2|3",
+            "2|4",
+            "3|4",
+            "null|2",
+            "null|4"
+        );
+  }
+
+  @Test
+  public void listSortonTwoFieldsOneDesc() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4},
+            new Object[]{2, 4},
+            new Object[]{null, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 
desc, field2")
+        .thenResultIs("INTEGER|INTEGER",
+            "3|4",
+            "2|3",
+            "2|4",
+            "1|5",
+            "-2147483648|2",
+            "-2147483648|4"
+        );
+  }
+
+  @Test
+  public void listSortonTwoFieldsNullHandlingEnabledOneDesc() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(true)
+        
.givenTable(TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT), 
TWO_FIELDS_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1, 5},
+            new Object[]{3, 4},
+            new Object[]{2, 4},
+            new Object[]{null, 4}
+        )
+        .andOnSecondInstance(
+            new Object[]{2, 3},
+            new Object[]{null, 2}
+        )
+        .whenQuery("select field1, field2 from testTable2 order by field1 
desc, field2")
+        .thenResultIs("INTEGER|INTEGER",
+            "null|2",
+            "null|4",
+            "3|4",
+            "2|3",
+            "2|4",
+            "1|5"
+        );
+  }
+
+  @Test
+  public void listOffset() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2},
+            new Object[]{null}
+        )
+        .whenQuery("select myField from testTable order by myField offset 1")
+        .thenResultIs("INTEGER",
+            "1",
+            "2",
+            "3"
+        );
+  }
+
+  @Test
+  public void listOffsetLimit() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2},
+            new Object[]{null}
+        )
+        .whenQuery("select myField from testTable order by myField offset 1 
limit 2")
+        .thenResultIs("INTEGER",
+            "1",
+            "2"
+        );
+  }
+
+  @Test
+  public void listOffsetLimitSmallerThanResultSize() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2}
+        )
+        .whenQuery("select myField from testTable order by myField offset 1 
limit 3")
+        .thenResultIs("INTEGER",
+            "2",
+            "3"
+        );
+  }
+
+  @Test
+  public void listOffsetLargerThanResult() {
+    FluentQueryTest.withBaseDir(_baseDir)
+        .withNullHandling(false)
+        
.givenTable(SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS.get(FieldSpec.DataType.INT),
 SINGLE_FIELD_TABLE_CONFIG)
+        .onFirstInstance(
+            new Object[]{1},
+            new Object[]{3}
+        )
+        .andOnSecondInstance(
+            new Object[]{2},
+            new Object[]{null}
+        )
+        .whenQuery("select myField from testTable order by myField offset 10")
+        .thenResultIs("INTEGER"
+        );
+  }
+
+  // utils ---
+
+  @DataProvider(name = "nullHandlingEnabled")
+  public Object[][] nullHandlingEnabled() {
+    return new Object[][]{
+        {false}, {true}
+    };
+  }
+
+  private static final FieldSpec.DataType[] VALID_DATA_TYPES = new 
FieldSpec.DataType[]{
+      FieldSpec.DataType.INT,
+      FieldSpec.DataType.LONG,
+      FieldSpec.DataType.FLOAT,
+      FieldSpec.DataType.DOUBLE,
+      FieldSpec.DataType.STRING,
+      FieldSpec.DataType.BYTES,
+      FieldSpec.DataType.BIG_DECIMAL,
+      FieldSpec.DataType.TIMESTAMP,
+      FieldSpec.DataType.BOOLEAN
+  };
+
+  protected static final Map<FieldSpec.DataType, Schema> 
SINGLE_FIELD_NULLABLE_DIMENSION_SCHEMAS =
+      Arrays.stream(VALID_DATA_TYPES)
+          .collect(Collectors.toMap(dt -> dt, dt -> new Schema.SchemaBuilder()
+              .setSchemaName("testTable")
+              .setEnableColumnBasedNullHandling(true)
+              .addDimensionField("myField", dt, f -> f.setNullable(true))
+              .build()));
+
+  protected static final Map<FieldSpec.DataType, Schema> 
TWO_FIELDS_NULLABLE_DIMENSION_SCHEMAS =
+      Arrays.stream(VALID_DATA_TYPES)
+          .collect(Collectors.toMap(dt -> dt, dt -> new Schema.SchemaBuilder()
+              .setSchemaName("testTable2")
+              .setEnableColumnBasedNullHandling(true)
+              .addDimensionField("field1", dt, f -> f.setNullable(true))
+              .addDimensionField("field2", dt, f -> f.setNullable(true))
+              .build()));
+
+  protected static final TableConfig SINGLE_FIELD_TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE)
+      .setTableName("testTable")
+      .build();
+
+  protected static final TableConfig TWO_FIELDS_TABLE_CONFIG = new 
TableConfigBuilder(TableType.OFFLINE)
+      .setTableName("testTable")
+      .build();
+
+  protected File _baseDir;
+
+  @BeforeClass
+  void createBaseDir() {
+    try {
+      _baseDir = 
Files.createTempDirectory(getClass().getSimpleName()).toFile();
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  @AfterClass
+  void destroyBaseDir()
+      throws IOException {
+    if (_baseDir != null) {
+      FileUtils.deleteDirectory(_baseDir);
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 9a55c8aa65..3491f5c43b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -298,6 +298,8 @@ public class LeafOperator extends MultiStageOperator {
           case NUM_CONSUMING_SEGMENTS_MATCHED:
             _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, 
Integer.parseInt(entry.getValue()));
             break;
+          case SORTED:
+            break;
           default: {
             throw new IllegalArgumentException("Unhandled V1 execution stat: " 
+ entry.getKey());
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to