This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0811d3e3609 Create a DescDocIdSetOperator and improve order by desc
when columns are sorted (#16789)
0811d3e3609 is described below
commit 0811d3e3609d8e4f92c2cf13789886b67573554a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Sep 23 16:03:11 2025 +0200
Create a DescDocIdSetOperator and improve order by desc when columns are
sorted (#16789)
---
.../common/utils/config/QueryOptionsUtils.java | 13 +-
.../pinot/core/operator/BaseDocIdSetOperator.java | 64 +++++++
.../pinot/core/operator/BaseProjectOperator.java | 12 +-
.../core/operator/BitmapDocIdSetOperator.java | 73 ++++++--
...ocIdSetBlock.java => DocIdOrderedOperator.java} | 35 ++--
.../pinot/core/operator/DocIdSetOperator.java | 18 +-
...rator.java => IntIteratorDocIdSetOperator.java} | 53 +++---
.../pinot/core/operator/ProjectionOperator.java | 32 +++-
.../core/operator/ProjectionOperatorUtils.java | 7 +-
...tOperator.java => ReverseDocIdSetOperator.java} | 67 ++++---
.../pinot/core/operator/blocks/DocIdSetBlock.java | 11 +-
.../ExpressionScanDocIdIterator.java | 25 ++-
.../operator/query/SelectionOrderByOperator.java | 4 +-
.../SelectionPartiallyOrderedByDescOperation.java | 6 +-
...SelectionPartiallyOrderedByLinearOperator.java} | 31 ++--
.../core/operator/transform/TransformOperator.java | 17 ++
.../apache/pinot/core/plan/DocIdSetPlanNode.java | 7 +-
.../org/apache/pinot/core/plan/FilterPlanNode.java | 17 +-
.../apache/pinot/core/plan/ProjectPlanNode.java | 12 +-
.../apache/pinot/core/plan/SelectionPlanNode.java | 61 +++---
.../combine/SelectionCombineOperatorTest.java | 95 ++++++++--
.../DefaultAggregationExecutorTest.java | 3 +-
...nnerSegmentSelectionSingleValueQueriesTest.java | 205 +++++++++++++++------
.../queries/TextMatchTransformFunctionTest.java | 3 +-
.../apache/pinot/perf/BenchmarkOrderByQueries.java | 167 +++++++----------
.../forward/FixedBitSVForwardIndexReaderV2.java | 17 ++
.../apache/pinot/spi/utils/CommonConstants.java | 3 +-
.../apache/pinot/tools/SortedColumnQuickstart.java | 187 +++++++++++++++++++
.../examples/batch/sorted/ingestionJobSpec.yaml | 124 +++++++++++++
29 files changed, 1030 insertions(+), 339 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 0ee27b145ed..a205f798daa 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -346,11 +346,6 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.FILTERED_AGGREGATIONS_SKIP_EMPTY_GROUPS));
}
- @Nullable
- public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
- return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
- }
-
@Nullable
public static Integer getMultiStageLeafLimit(Map<String, String>
queryOptions) {
String maxLeafLimitStr =
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
@@ -582,4 +577,12 @@ public class QueryOptionsUtils {
public static String getWorkloadName(Map<String, String> queryOptions) {
return queryOptions.getOrDefault(QueryOptionKey.WORKLOAD_NAME,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
}
+
+ public static boolean isReverseOrderAllowed(Map<String, String>
queryOptions) {
+ String value = queryOptions.get(QueryOptionKey.ALLOW_REVERSE_ORDER);
+ if (value == null) {
+ return QueryOptionKey.DEFAULT_ALLOW_REVERSE_ORDER;
+ }
+ return Boolean.parseBoolean(value);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
new file mode 100644
index 00000000000..cefed0c67d4
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseDocIdSetOperator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
+
+
+/// The base class for operators that produce [DocIdSetBlock].
+///
+/// These operators are the intermediate between
+/// [filter
operators][org.apache.pinot.core.operator.filter.BaseFilterOperator] and
+/// [projection operators][org.apache.pinot.core.operator.ProjectionOperator].
+///
+/// Filter operators return a
[org.apache.pinot.core.operator.blocks.FilterBlock], whose method
+///
[getBlockDocIdSet()][org.apache.pinot.core.operator.blocks.FilterBlock#getBlockDocIdSet()]
is used
+/// to build different types of [BaseDocIdSetOperator]s (e.g.
[DocIdSetOperator]). Contrary to filter operators,
+/// whose
[nextBlock()][org.apache.pinot.core.operator.filter.BaseFilterOperator#nextBlock()]
method returns always
+/// the same block (which contains all the matched document ids for the
segment),
+/// **DocIdSetOperator[.nextBlock()][BaseDocIdSetOperator#nextBlock()] split
the segment in multiple blocks and
+/// therefore must be called multiple times until it returns `null`**.
+///
+/// The blocks returned by [BaseDocIdSetOperator] can and usually are returned
in a given order. Most of the time, the
+/// order is ascending, which means that the blocks and the rows inside them
are sorted by their document ids in
+/// ascending order, but may be changed depending on the query. For example
the optimizer may decide to configure
+/// operators to emit rows in descending order to optimize the performance of
top-N queries on segments sorted in
+/// ascending order.
+public abstract class BaseDocIdSetOperator extends BaseOperator<DocIdSetBlock>
+ implements DocIdOrderedOperator<DocIdSetBlock> {
+
+ /// Returns a [BaseDocIdSetOperator] that is compatible with the requested
order or fails with
+ /// [IllegalArgumentException] if the order is not supported for this
operator.
+ ///
+ /// It may return `this` if the order is already the requested one.
+ ///
+ /// @return a [BaseDocIdSetOperator] that is ascending if `ascending` is
true or descending otherwise. Remember that
+ /// an operator may be both ascending and descending if it is empty.
+ /// @throws UnsupportedOperationException if the order is not supported for
this operator.
+ public abstract BaseDocIdSetOperator withOrder(DocIdOrder order) throws
UnsupportedOperationException;
+
+ /// Returns the next block of document ids that match the filter (if any).
+ ///
+ /// Each time this method is called, it returns the next block of document
ids, or `null` if there is no more
+ /// blocks.
+ @Override
+ @Nullable
+ protected abstract DocIdSetBlock getNextBlock();
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
index aaaa8cc4155..4c6d96f67e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseProjectOperator.java
@@ -23,7 +23,8 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.operator.blocks.ValueBlock;
-public abstract class BaseProjectOperator<T extends ValueBlock> extends
BaseOperator<T> {
+public abstract class BaseProjectOperator<T extends ValueBlock> extends
BaseOperator<T>
+ implements DocIdOrderedOperator<T> {
/**
* Returns a map from source column name to context.
@@ -41,4 +42,13 @@ public abstract class BaseProjectOperator<T extends
ValueBlock> extends BaseOper
public int getNumColumnsProjected() {
return getSourceColumnContextMap().size();
}
+
+ /**
+ * Returns an instance of {@link BaseProjectOperator} with the given order.
+ *
+ * The instance can be the same as the current one if requested order is the
same as the current one.
+ *
+ * @throws UnsupportedOperationException if the order cannot be changed
+ */
+ public abstract BaseProjectOperator<T> withOrder(DocIdOrder newOrder);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
index 4f203599c1e..549eda9e1bf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
@@ -32,42 +32,61 @@ import org.roaringbitmap.IntIterator;
* <p>Should call {@link #nextBlock()} multiple times until it returns
<code>null</code> (already exhausts all the
* documents) or already gathered enough documents (for selection queries).
*/
-public class BitmapDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
-
+public class BitmapDocIdSetOperator extends BaseDocIdSetOperator {
private static final String EXPLAIN_NAME = "DOC_ID_SET_BITMAP";
+ private final ImmutableBitmapDataProvider _docIds;
+ private final int[] _docIdBuffer;
+ private final DocIdOrder _docIdOrder;
+
// TODO: Consider using BatchIterator to fill the document ids. Currently
BatchIterator only reads bits for one
// container instead of trying to fill up the buffer with bits from
multiple containers. If in the future
// BatchIterator provides an API to fill up the buffer, switch to
BatchIterator.
- private final IntIterator _intIterator;
- private final int[] _docIdBuffer;
+ private IntIterator _docIdIterator;
+
+ public BitmapDocIdSetOperator(ImmutableBitmapDataProvider docIds, int[]
docIdBuffer, DocIdOrder docIdOrder) {
+ _docIds = docIds;
+ _docIdBuffer = docIdBuffer;
+ _docIdOrder = docIdOrder;
+ }
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ public BitmapDocIdSetOperator(IntIterator docIdIterator, int[] docIdBuffer,
DocIdOrder docIdOrder) {
+ _docIds = null;
+ _docIdIterator = docIdIterator;
+ _docIdBuffer = docIdBuffer;
+ _docIdOrder = docIdOrder;
}
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int
numDocs) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = new int[Math.min(numDocs,
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
+ public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider
docIds) {
+ return ascending(docIds, new int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
}
- public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
- _intIterator = intIterator;
- _docIdBuffer = docIdBuffer;
+ public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider
docIds, int numDocs) {
+ return ascending(docIds, new int[Math.min(numDocs,
DocIdSetPlanNode.MAX_DOC_PER_CALL)]);
}
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[]
docIdBuffer) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = docIdBuffer;
+ public static BitmapDocIdSetOperator ascending(ImmutableBitmapDataProvider
docIds, int[] docIdBuffer) {
+ return new BitmapDocIdSetOperator(docIds, docIdBuffer, DocIdOrder.ASC);
+ }
+
+ public static BitmapDocIdSetOperator descending(ImmutableBitmapDataProvider
docIds, int numDocs) {
+ return descending(docIds, new int[Math.min(numDocs,
DocIdSetPlanNode.MAX_DOC_PER_CALL)]);
+ }
+
+ public static BitmapDocIdSetOperator descending(ImmutableBitmapDataProvider
bitmap, int[] docIdBuffer) {
+ return new BitmapDocIdSetOperator(bitmap, docIdBuffer, DocIdOrder.DESC);
}
@Override
protected DocIdSetBlock getNextBlock() {
+ if (_docIdIterator == null) {
+ assert _docIds != null;
+ _docIdIterator = _docIdOrder == DocIdOrder.ASC ?
_docIds.getIntIterator() : _docIds.getReverseIntIterator();
+ }
int bufferSize = _docIdBuffer.length;
int index = 0;
- while (index < bufferSize && _intIterator.hasNext()) {
- _docIdBuffer[index++] = _intIterator.next();
+ while (index < bufferSize && _docIdIterator.hasNext()) {
+ _docIdBuffer[index++] = _docIdIterator.next();
}
if (index > 0) {
return new DocIdSetBlock(_docIdBuffer, index);
@@ -76,7 +95,6 @@ public class BitmapDocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
}
}
-
@Override
public String toExplainString() {
return EXPLAIN_NAME;
@@ -86,4 +104,21 @@ public class BitmapDocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
public List<Operator> getChildOperators() {
return Collections.emptyList();
}
+
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return _docIdOrder == order;
+ }
+
+ @Override
+ public BaseDocIdSetOperator withOrder(DocIdOrder order)
+ throws UnsupportedOperationException {
+ if (isCompatibleWith(order)) {
+ return this;
+ }
+ if (_docIds == null) {
+ throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support
changing its order");
+ }
+ return new BitmapDocIdSetOperator(_docIds, _docIdBuffer, order);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
similarity index 55%
copy from
pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
index d4bb1562769..5ec60a26995 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdOrderedOperator.java
@@ -16,30 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.operator.blocks;
+package org.apache.pinot.core.operator;
import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.common.Operator;
-/**
- * The {@code DocIdSetBlock} contains a block of document ids (sorted), and is
returned from {@link DocIdSetOperator}.
- * Each {@code DocIdSetOperator} can return multiple {@code DocIdSetBlock}s.
- */
-public class DocIdSetBlock implements Block {
- private final int[] _docIds;
- private final int _length;
+/// An operator that is bound to a specific segment.
+public interface DocIdOrderedOperator<T extends Block> extends Operator<T> {
+ /// Returns true if the operator is ordered by docId in the specified order.
+ ///
+ /// Remember that empty operators or operators that return a single row are
considered ordered.
+ boolean isCompatibleWith(DocIdOrder order);
- public DocIdSetBlock(int[] docIds, int length) {
- _docIds = docIds;
- _length = length;
- }
-
- public int[] getDocIds() {
- return _docIds;
- }
+ enum DocIdOrder {
+ /// The rows are sorted in strictly ascending docId order.
+ ASC,
+ /// The rows are sorted in strictly descending docId order.
+ DESC;
- public int getLength() {
- return _length;
+ public static DocIdOrder fromAsc(boolean asc) {
+ return asc ? ASC : DESC;
+ }
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index 706de6d4fe4..b8fd1a47380 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -32,11 +32,12 @@ import org.apache.pinot.spi.trace.Tracing;
/**
- * The <code>DocIdSetOperator</code> takes a filter operator and returns
blocks with set of the matched document Ids.
+ * The <code>AscendingDocIdSetOperator</code> takes a filter operator and
returns blocks with set of the matched
+ * document Ids.
* <p>Should call {@link #nextBlock()} multiple times until it returns
<code>null</code> (already exhausts all the
* matched documents) or already gathered enough documents (for selection
queries).
*/
-public class DocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+public class DocIdSetOperator extends BaseDocIdSetOperator {
private static final String EXPLAIN_NAME = "DOC_ID_SET";
private static final ThreadLocal<int[]> THREAD_LOCAL_DOC_IDS =
@@ -106,4 +107,17 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
long numEntriesScannedInFilter = _blockDocIdSet != null ?
_blockDocIdSet.getNumEntriesScannedInFilter() : 0;
return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
}
+
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return order == DocIdOrder.ASC;
+ }
+
+ @Override
+ public BaseDocIdSetOperator withOrder(DocIdOrder order) {
+ if (isCompatibleWith(order)) {
+ return this;
+ }
+ return new ReverseDocIdSetOperator(_filterOperator, _maxSizeOfDocIdSet);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
similarity index 59%
copy from
pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
index 4f203599c1e..4f1da707e50 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/IntIteratorDocIdSetOperator.java
@@ -18,51 +18,46 @@
*/
package org.apache.pinot.core.operator;
-import java.util.Collections;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
-import org.roaringbitmap.ImmutableBitmapDataProvider;
import org.roaringbitmap.IntIterator;
-/**
- * The <code>BitmapDocIdSetOperator</code> takes a bitmap of document ids and
returns blocks of document ids.
- * <p>Should call {@link #nextBlock()} multiple times until it returns
<code>null</code> (already exhausts all the
- * documents) or already gathered enough documents (for selection queries).
- */
-public class BitmapDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+public class IntIteratorDocIdSetOperator extends BaseDocIdSetOperator {
- private static final String EXPLAIN_NAME = "DOC_ID_SET_BITMAP";
+ private static final String EXPLAIN_NAME = "DOC_ID_SET_ITERATOR";
// TODO: Consider using BatchIterator to fill the document ids. Currently
BatchIterator only reads bits for one
// container instead of trying to fill up the buffer with bits from
multiple containers. If in the future
// BatchIterator provides an API to fill up the buffer, switch to
BatchIterator.
private final IntIterator _intIterator;
private final int[] _docIdBuffer;
+ private final DocIdOrder _docIdOrder;
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
- }
-
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int
numDocs) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = new int[Math.min(numDocs,
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
- }
-
- public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
+ public IntIteratorDocIdSetOperator(IntIterator intIterator, int[]
docIdBuffer, DocIdOrder docIdOrder) {
_intIterator = intIterator;
_docIdBuffer = docIdBuffer;
+ _docIdOrder = docIdOrder;
}
- public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[]
docIdBuffer) {
- _intIterator = bitmap.getIntIterator();
- _docIdBuffer = docIdBuffer;
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return _docIdOrder == order;
}
@Override
+ public BaseDocIdSetOperator withOrder(DocIdOrder order)
+ throws UnsupportedOperationException {
+ if (_docIdOrder != order) {
+ throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support
changing its order");
+ }
+ return this;
+ }
+
+ @Override
+ @Nullable
protected DocIdSetBlock getNextBlock() {
int bufferSize = _docIdBuffer.length;
int index = 0;
@@ -76,14 +71,14 @@ public class BitmapDocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
}
}
-
@Override
- public String toExplainString() {
- return EXPLAIN_NAME;
+ public List<? extends Operator> getChildOperators() {
+ return List.of();
}
+ @Nullable
@Override
- public List<Operator> getChildOperators() {
- return Collections.emptyList();
+ public String toExplainString() {
+ return EXPLAIN_NAME;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
index b3e76028dbc..85b07166d27 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
@@ -40,14 +40,14 @@ public class ProjectionOperator extends
BaseProjectOperator<ProjectionBlock> imp
protected static final String EXPLAIN_NAME = "PROJECT";
protected final Map<String, DataSource> _dataSourceMap;
- protected final BaseOperator<DocIdSetBlock> _docIdSetOperator;
+ protected final BaseDocIdSetOperator _docIdSetOperator;
protected final DataFetcher _dataFetcher;
protected final DataBlockCache _dataBlockCache;
protected final Map<String, ColumnContext> _columnContextMap;
protected final QueryContext _queryContext;
public ProjectionOperator(Map<String, DataSource> dataSourceMap,
- @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext
queryContext) {
+ @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext
queryContext) {
_dataSourceMap = dataSourceMap;
_docIdSetOperator = docIdSetOperator;
_dataFetcher = new DataFetcher(dataSourceMap);
@@ -58,6 +58,17 @@ public class ProjectionOperator extends
BaseProjectOperator<ProjectionBlock> imp
_queryContext = queryContext;
}
+ private ProjectionOperator(Map<String, DataSource> dataSourceMap,
+ @Nullable BaseDocIdSetOperator docIdSetOperator, DataFetcher
dataFetcher, DataBlockCache dataBlockCache,
+ Map<String, ColumnContext> columnContextMap, QueryContext queryContext) {
+ _dataSourceMap = dataSourceMap;
+ _docIdSetOperator = docIdSetOperator;
+ _dataFetcher = dataFetcher;
+ _dataBlockCache = dataBlockCache;
+ _columnContextMap = columnContextMap;
+ _queryContext = queryContext;
+ }
+
@Override
public Map<String, ColumnContext> getSourceColumnContextMap() {
return _columnContextMap;
@@ -93,7 +104,7 @@ public class ProjectionOperator extends
BaseProjectOperator<ProjectionBlock> imp
public String toExplainString() {
StringBuilder stringBuilder = new StringBuilder(EXPLAIN_NAME).append('(');
// SQL statements such as SELECT 'literal' FROM myTable don't have any
projection columns.
- if (!_dataSourceMap.keySet().isEmpty()) {
+ if (!_dataSourceMap.isEmpty()) {
int count = 0;
for (String col : _dataSourceMap.keySet()) {
if (count == _dataSourceMap.keySet().size() - 1) {
@@ -123,6 +134,21 @@ public class ProjectionOperator extends
BaseProjectOperator<ProjectionBlock> imp
return _docIdSetOperator != null ?
_docIdSetOperator.getExecutionStatistics() : new ExecutionStatistics(0, 0, 0,
0);
}
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return _docIdSetOperator == null ||
_docIdSetOperator.isCompatibleWith(order);
+ }
+
+ @Override
+ public BaseProjectOperator<ProjectionBlock> withOrder(DocIdOrder newOrder) {
+ BaseDocIdSetOperator orderedOperator =
_docIdSetOperator.withOrder(newOrder);
+ if (orderedOperator == _docIdSetOperator) {
+ return this;
+ }
+ return new ProjectionOperator(_dataSourceMap, orderedOperator,
_dataFetcher, _dataBlockCache, _columnContextMap,
+ _queryContext);
+ }
+
@Override
public void close() {
_dataBlockCache.close();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
index 151a597a18d..508be8520a6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperatorUtils.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.operator;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -36,7 +35,7 @@ public class ProjectionOperatorUtils {
}
public static ProjectionOperator getProjectionOperator(Map<String,
DataSource> dataSourceMap,
- @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext
queryContext) {
+ @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext
queryContext) {
return _instance.getProjectionOperator(dataSourceMap, docIdSetOperator,
queryContext);
}
@@ -45,13 +44,13 @@ public class ProjectionOperatorUtils {
* Returns the projection operator
*/
ProjectionOperator getProjectionOperator(Map<String, DataSource>
dataSourceMap,
- @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext
queryContext);
+ @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext
queryContext);
}
public static class DefaultImplementation implements Implementation {
@Override
public ProjectionOperator getProjectionOperator(Map<String, DataSource>
dataSourceMap,
- @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator, QueryContext
queryContext) {
+ @Nullable BaseDocIdSetOperator docIdSetOperator, QueryContext
queryContext) {
return new ProjectionOperator(dataSourceMap, docIdSetOperator,
queryContext);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
similarity index 62%
copy from
pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
index 706de6d4fe4..7ecd45e3a5f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/ReverseDocIdSetOperator.java
@@ -19,25 +19,23 @@
package org.apache.pinot.core.operator;
import com.google.common.base.Preconditions;
-import java.util.Collections;
import java.util.List;
import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
+import org.apache.pinot.core.operator.dociditerators.BitmapBasedDocIdIterator;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.trace.Tracing;
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
-/**
- * The <code>DocIdSetOperator</code> takes a filter operator and returns
blocks with set of the matched document Ids.
- * <p>Should call {@link #nextBlock()} multiple times until it returns
<code>null</code> (already exhausts all the
- * matched documents) or already gathered enough documents (for selection
queries).
- */
-public class DocIdSetOperator extends BaseOperator<DocIdSetBlock> {
- private static final String EXPLAIN_NAME = "DOC_ID_SET";
+public class ReverseDocIdSetOperator extends BaseDocIdSetOperator {
+ private static final String EXPLAIN_NAME = "REVERSE_DOC_ID_SET";
private static final ThreadLocal<int[]> THREAD_LOCAL_DOC_IDS =
ThreadLocal.withInitial(() -> new
int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
@@ -46,10 +44,10 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
private final int _maxSizeOfDocIdSet;
private BlockDocIdSet _blockDocIdSet;
- private BlockDocIdIterator _blockDocIdIterator;
private int _currentDocId = 0;
+ private IntIterator _reverseIterator;
- public DocIdSetOperator(BaseFilterOperator filterOperator, int
maxSizeOfDocIdSet) {
+ public ReverseDocIdSetOperator(BaseFilterOperator filterOperator, int
maxSizeOfDocIdSet) {
Preconditions.checkArgument(maxSizeOfDocIdSet > 0 && maxSizeOfDocIdSet <=
DocIdSetPlanNode.MAX_DOC_PER_CALL);
_filterOperator = filterOperator;
_maxSizeOfDocIdSet = maxSizeOfDocIdSet;
@@ -57,25 +55,20 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
@Override
protected DocIdSetBlock getNextBlock() {
- if (_currentDocId == Constants.EOF) {
- return null;
+ if (_reverseIterator == null) {
+ initializeBitmap();
}
- // Initialize filter block document Id set
- if (_blockDocIdSet == null) {
- _blockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
- _blockDocIdIterator = _blockDocIdSet.iterator();
+ if (_currentDocId == Constants.EOF) {
+ return null;
}
Tracing.ThreadAccountantOps.sample();
int pos = 0;
int[] docIds = THREAD_LOCAL_DOC_IDS.get();
- for (int i = 0; i < _maxSizeOfDocIdSet; i++) {
- _currentDocId = _blockDocIdIterator.next();
- if (_currentDocId == Constants.EOF) {
- break;
- }
+ for (int i = 0; i < _maxSizeOfDocIdSet && _reverseIterator.hasNext(); i++)
{
+ _currentDocId = _reverseIterator.next();
docIds[pos++] = _currentDocId;
}
if (pos > 0) {
@@ -85,6 +78,22 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
}
}
+ private void initializeBitmap() {
+ _blockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
+ BlockDocIdIterator iterator = _blockDocIdSet.iterator();
+ if (iterator instanceof BitmapBasedDocIdIterator) {
+ _reverseIterator = ((BitmapBasedDocIdIterator)
iterator).getDocIds().getReverseIntIterator();
+ } else {
+ RoaringBitmapWriter<RoaringBitmap> writer =
RoaringBitmapWriter.writer().get();
+ int docId = iterator.next();
+ while (docId != Constants.EOF) {
+ writer.add(docId);
+ docId = iterator.next();
+ }
+ _reverseIterator = writer.get().getReverseIntIterator();
+ }
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
@@ -92,7 +101,7 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
@Override
public List<Operator> getChildOperators() {
- return Collections.singletonList(_filterOperator);
+ return List.of(_filterOperator);
}
@Override
@@ -106,4 +115,18 @@ public class DocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
long numEntriesScannedInFilter = _blockDocIdSet != null ?
_blockDocIdSet.getNumEntriesScannedInFilter() : 0;
return new ExecutionStatistics(0, numEntriesScannedInFilter, 0, 0);
}
+
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return order == DocIdOrder.DESC;
+ }
+
+ @Override
+ public BaseDocIdSetOperator withOrder(DocIdOrder order)
+ throws UnsupportedOperationException {
+ if (isCompatibleWith(order)) {
+ return this;
+ }
+ return new DocIdSetOperator(_filterOperator, _maxSizeOfDocIdSet);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
index d4bb1562769..af31e1520f5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/DocIdSetBlock.java
@@ -19,12 +19,17 @@
package org.apache.pinot.core.operator.blocks;
import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.operator.DocIdSetOperator;
+import org.apache.pinot.core.operator.BaseDocIdSetOperator;
/**
- * The {@code DocIdSetBlock} contains a block of document ids (sorted), and is
returned from {@link DocIdSetOperator}.
- * Each {@code DocIdSetOperator} can return multiple {@code DocIdSetBlock}s.
+ * The {@code DocIdSetBlock} contains a block of document ids and is returned
from {@link BaseDocIdSetOperator}.
+ *
+ * Each {@code BaseDocIdSetOperator} can return multiple {@code
DocIdSetBlock}s and each block contains an array of
+ * document ids.
+ *
+ * Do not confuse this class with {@link
org.apache.pinot.core.common.BlockDocIdSet}, which is returned by
+ * the filter operator and contains a set of document ids that can be iterated
through.
*/
public class DocIdSetBlock implements Block {
private final int[] _docIds;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 03d16f15e8a..d64bcdc6128 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -25,8 +25,9 @@ import java.util.Map;
import java.util.OptionalInt;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.BaseDocIdSetOperator;
import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.core.operator.ProjectionOperatorUtils;
import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
@@ -127,8 +128,10 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
@Override
public MutableRoaringBitmap applyAnd(BatchIterator batchIterator,
OptionalInt firstDoc, OptionalInt lastDoc) {
IntIterator intIterator = batchIterator.asIntIterator(new
int[OPTIMAL_ITERATOR_BATCH_SIZE]);
+ BaseDocIdSetOperator docIdSetOperator =
+ new BitmapDocIdSetOperator(intIterator, _docIdBuffer,
DocIdOrderedOperator.DocIdOrder.ASC);
try (ProjectionOperator projectionOperator =
ProjectionOperatorUtils.getProjectionOperator(_dataSourceMap,
- new BitmapDocIdSetOperator(intIterator, _docIdBuffer), _queryContext))
{
+ docIdSetOperator, _queryContext)) {
MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
ProjectionBlock projectionBlock;
while ((projectionBlock = projectionOperator.nextBlock()) != null) {
@@ -141,7 +144,7 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
@Override
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
try (ProjectionOperator projectionOperator =
ProjectionOperatorUtils.getProjectionOperator(_dataSourceMap,
- new BitmapDocIdSetOperator(docIds, _docIdBuffer), _queryContext)) {
+ new BitmapDocIdSetOperator(docIds, _docIdBuffer,
DocIdOrderedOperator.DocIdOrder.ASC), _queryContext)) {
MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
ProjectionBlock projectionBlock;
while ((projectionBlock = projectionOperator.nextBlock()) != null) {
@@ -420,7 +423,7 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
/**
* NOTE: This operator contains only one block.
*/
- private class RangeDocIdSetOperator extends BaseOperator<DocIdSetBlock> {
+ private class RangeDocIdSetOperator extends BaseDocIdSetOperator {
static final String EXPLAIN_NAME = "DOC_ID_SET_RANGE";
DocIdSetBlock _docIdSetBlock;
@@ -449,6 +452,20 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
public List<Operator> getChildOperators() {
return Collections.emptyList();
}
+
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return DocIdOrder.ASC == order;
+ }
+
+ @Override
+ public BaseDocIdSetOperator withOrder(DocIdOrder order)
+ throws UnsupportedOperationException {
+ if (order == DocIdOrder.ASC) {
+ return this;
+ }
+ throw new UnsupportedOperationException(EXPLAIN_NAME + " doesn't support
descending order");
+ }
}
public enum PredicateEvaluationResult {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index d77d4cfaf4b..5db3e914930 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -282,9 +282,9 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
dataSourceMap.put(column, _indexSegment.getDataSource(column,
_queryContext.getSchema()));
}
+ BitmapDocIdSetOperator docIdOperator =
BitmapDocIdSetOperator.ascending(docIds, numRows);
try (ProjectionOperator projectionOperator =
- ProjectionOperatorUtils.getProjectionOperator(dataSourceMap, new
BitmapDocIdSetOperator(docIds, numRows),
- _queryContext)) {
+ ProjectionOperatorUtils.getProjectionOperator(dataSourceMap,
docIdOperator, _queryContext)) {
TransformOperator transformOperator =
new TransformOperator(_queryContext, projectionOperator,
nonOrderByExpressions);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
index be5a9b3fb25..fb318d85be3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByDescOperation.java
@@ -48,8 +48,10 @@ public class SelectionPartiallyOrderedByDescOperation
extends LinearSelectionOrd
super(indexSegment, queryContext, expressions, projectOperator,
numSortedExpressions);
assert queryContext.getOrderByExpressions() != null;
Preconditions.checkArgument(!queryContext.getOrderByExpressions().stream()
- .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER).findFirst()
- .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers")).isAsc(),
+ .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers"))
+ .isAsc(),
"%s can only be used when the first column in order by is DESC",
EXPLAIN_NAME);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
similarity index 66%
rename from
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
index ff4c1c8654e..eaedac0003c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByAscOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByLinearOperator.java
@@ -18,37 +18,46 @@
*/
package org.apache.pinot.core.operator.query;
-import com.google.common.base.Preconditions;
import java.util.List;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
/**
- * An operator for order-by queries ASC that are partially sorted over the
sorting keys.
+ * The operator used when selecting with order-by on a way that the segment
layout can be used to prune results.
+ *
+ * This requires that the first order-by expression is on a column that is
sorted in the segment and:
+ * 1. The order-by is ASC
+ * 2. The order-by is DESC and the input operators support descending
iteration (e.g. full scan)
+ *
* @see LinearSelectionOrderByOperator
*/
-public class SelectionPartiallyOrderedByAscOperator extends
LinearSelectionOrderByOperator {
+public class SelectionPartiallyOrderedByLinearOperator extends
LinearSelectionOrderByOperator {
- private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_ASC";
+ private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDER_BY_LINEAR";
private int _numDocsScanned = 0;
- public SelectionPartiallyOrderedByAscOperator(IndexSegment indexSegment,
QueryContext queryContext,
+ public SelectionPartiallyOrderedByLinearOperator(IndexSegment indexSegment,
QueryContext queryContext,
List<ExpressionContext> expressions, BaseProjectOperator<?>
projectOperator, int numSortedExpressions) {
super(indexSegment, queryContext, expressions, projectOperator,
numSortedExpressions);
- Preconditions.checkArgument(queryContext.getOrderByExpressions().stream()
- .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER)
- .findFirst()
- .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers"))
- .isAsc(),
- "%s can only be used when the first column in order by is ASC",
EXPLAIN_NAME);
+ boolean firstColIsAsc = queryContext.getOrderByExpressions().stream()
+ .filter(expr -> expr.getExpression().getType() ==
ExpressionContext.Type.IDENTIFIER)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("The query is not
order by identifiers"))
+ .isAsc();
+ DocIdOrderedOperator.DocIdOrder docIdOrder =
DocIdOrderedOperator.DocIdOrder.fromAsc(firstColIsAsc);
+ if (!projectOperator.isCompatibleWith(docIdOrder)) {
+ throw new IllegalStateException(EXPLAIN_NAME + " requires the input
operator to be compatible with order: "
+ + docIdOrder + ", but found: " + projectOperator.toExplainString());
+ }
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index d4d003ae038..78418e2fdba 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -60,6 +60,13 @@ public class TransformOperator extends
BaseProjectOperator<TransformBlock> {
}
}
+ private TransformOperator(
+ BaseProjectOperator<?> projectOperator,
+ Map<ExpressionContext, TransformFunction> transformFunctionMap) {
+ _projectOperator = projectOperator;
+ _transformFunctionMap = transformFunctionMap;
+ }
+
@Override
public Map<String, ColumnContext> getSourceColumnContextMap() {
return _projectOperator.getSourceColumnContextMap();
@@ -110,4 +117,14 @@ public class TransformOperator extends
BaseProjectOperator<TransformBlock> {
public ExecutionStatistics getExecutionStatistics() {
return _projectOperator.getExecutionStatistics();
}
+
+ @Override
+ public boolean isCompatibleWith(DocIdOrder order) {
+ return _projectOperator.isCompatibleWith(order);
+ }
+
+ @Override
+ public BaseProjectOperator<TransformBlock> withOrder(DocIdOrder newOrder) {
+ return new TransformOperator(_projectOperator.withOrder(newOrder),
_transformFunctionMap);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
index 0e3558014b5..1b7fcbef856 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.plan;
-import javax.annotation.Nullable;
import org.apache.pinot.core.operator.DocIdSetOperator;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -33,7 +32,7 @@ public class DocIdSetPlanNode implements PlanNode {
private final BaseFilterOperator _filterOperator;
public DocIdSetPlanNode(SegmentContext segmentContext, QueryContext
queryContext, int maxDocPerCall,
- @Nullable BaseFilterOperator filterOperator) {
+ BaseFilterOperator filterOperator) {
assert maxDocPerCall > 0 && maxDocPerCall <= MAX_DOC_PER_CALL;
_segmentContext = segmentContext;
@@ -44,8 +43,6 @@ public class DocIdSetPlanNode implements PlanNode {
@Override
public DocIdSetOperator run() {
- return new DocIdSetOperator(
- _filterOperator != null ? _filterOperator : new
FilterPlanNode(_segmentContext, _queryContext).run(),
- _maxDocPerCall);
+ return new DocIdSetOperator(_filterOperator, _maxDocPerCall);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 238d83363ed..d7770a0b4a0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -214,10 +214,12 @@ public class FilterPlanNode implements PlanNode {
* Helper method to build the operator tree from the filter.
*/
private BaseFilterOperator constructPhysicalOperator(FilterContext filter,
int numDocs) {
+ List<FilterContext> childFilters;
+ List<BaseFilterOperator> childFilterOperators;
switch (filter.getType()) {
case AND:
- List<FilterContext> childFilters = filter.getChildren();
- List<BaseFilterOperator> childFilterOperators = new
ArrayList<>(childFilters.size());
+ childFilters = filter.getChildren();
+ childFilterOperators = new ArrayList<>(childFilters.size());
for (FilterContext childFilter : childFilters) {
BaseFilterOperator childFilterOperator =
constructPhysicalOperator(childFilter, numDocs);
if (childFilterOperator.isResultEmpty()) {
@@ -266,9 +268,10 @@ public class FilterPlanNode implements PlanNode {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column,
_queryContext.getSchema());
PredicateEvaluator predicateEvaluator;
+ TextIndexReader textIndexReader;
switch (predicate.getType()) {
case TEXT_CONTAINS:
- TextIndexReader textIndexReader = dataSource.getTextIndex();
+ textIndexReader = dataSource.getTextIndex();
if (!(textIndexReader instanceof NativeTextIndexReader)
&& !(textIndexReader instanceof NativeMutableTextIndex)) {
throw new UnsupportedOperationException("TEXT_CONTAINS is
supported only on native text index");
@@ -340,20 +343,22 @@ public class FilterPlanNode implements PlanNode {
Preconditions.checkState(vectorIndex != null,
"Cannot apply VECTOR_SIMILARITY on column: %s without vector
index", column);
return new VectorSimilarityFilterOperator(vectorIndex,
(VectorSimilarityPredicate) predicate, numDocs);
- case IS_NULL:
+ case IS_NULL: {
NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
if (nullValueVector != null) {
return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), false, numDocs);
} else {
return EmptyFilterOperator.getInstance();
}
- case IS_NOT_NULL:
- nullValueVector = dataSource.getNullValueVector();
+ }
+ case IS_NOT_NULL: {
+ NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
if (nullValueVector != null) {
return new
BitmapBasedFilterOperator(nullValueVector.getNullBitmap(), true, numDocs);
} else {
return new MatchAllFilterOperator(numDocs);
}
+ }
default:
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate,
dataSource, _queryContext);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
index a807b22417b..9aa7e1057f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.operator.BaseProjectOperator;
@@ -50,7 +49,7 @@ public class ProjectPlanNode implements PlanNode {
private final BaseFilterOperator _filterOperator;
public ProjectPlanNode(SegmentContext segmentContext, QueryContext
queryContext,
- Collection<ExpressionContext> expressions, int maxDocsPerCall, @Nullable
BaseFilterOperator filterOperator) {
+ Collection<ExpressionContext> expressions, int maxDocsPerCall,
BaseFilterOperator filterOperator) {
_indexSegment = segmentContext.getIndexSegment();
_segmentContext = segmentContext;
_queryContext = queryContext;
@@ -61,7 +60,8 @@ public class ProjectPlanNode implements PlanNode {
public ProjectPlanNode(SegmentContext segmentContext, QueryContext
queryContext,
Collection<ExpressionContext> expressions, int maxDocsPerCall) {
- this(segmentContext, queryContext, expressions, maxDocsPerCall, null);
+ this(segmentContext, queryContext, expressions, maxDocsPerCall,
+ new FilterPlanNode(segmentContext, queryContext).run());
}
@Override
@@ -78,9 +78,9 @@ public class ProjectPlanNode implements PlanNode {
projectionColumns.forEach(
column -> dataSourceMap.put(column,
_indexSegment.getDataSource(column, _queryContext.getSchema())));
// NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for
selection query with LIMIT 0)
- DocIdSetOperator docIdSetOperator =
- _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_segmentContext,
_queryContext, _maxDocsPerCall,
- _filterOperator).run() : null;
+ DocIdSetOperator docIdSetOperator = _maxDocsPerCall > 0
+ ? new DocIdSetPlanNode(_segmentContext, _queryContext,
_maxDocsPerCall, _filterOperator).run()
+ : null;
// TODO: figure out a way to close this operator, as it may hold reader
context
ProjectionOperator projectionOperator =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index d59345e1846..fdb68d9866f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -20,18 +20,18 @@ package org.apache.pinot.core.plan;
import java.util.ArrayList;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.core.operator.DocIdOrderedOperator;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByLinearOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -79,24 +79,26 @@ public class SelectionPlanNode implements PlanNode {
// Although it is a break of abstraction, some code, specially merging,
assumes that if there is an order by
// expression the operator will return a block whose selection result is a
priority queue.
int sortedColumnsPrefixSize = getSortedColumnsPrefix(orderByExpressions,
_queryContext.isNullHandlingEnabled());
- OrderByAlgorithm orderByAlgorithm =
OrderByAlgorithm.fromQueryContext(_queryContext);
- if (sortedColumnsPrefixSize > 0 && orderByAlgorithm !=
OrderByAlgorithm.NAIVE) {
+ if (sortedColumnsPrefixSize > 0) {
int maxDocsPerCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
// The first order by expressions are sorted (either asc or desc).
// ie: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column
DESC LIMIT 10 OFFSET 5
// or: SELECT ... FROM Table WHERE predicates ORDER BY sorted_column,
not_sorted LIMIT 10 OFFSET 5
// but not SELECT ... FROM Table WHERE predicates ORDER BY not_sorted,
sorted_column LIMIT 10 OFFSET 5
- if (orderByExpressions.get(0).isAsc()) {
- if (sortedColumnsPrefixSize == orderByExpressions.size()) {
- maxDocsPerCall = Math.min(limit + _queryContext.getOffset(),
DocIdSetPlanNode.MAX_DOC_PER_CALL);
- }
- BaseProjectOperator<?> projectOperator =
- new ProjectPlanNode(_segmentContext, _queryContext, expressions,
maxDocsPerCall).run();
- return new SelectionPartiallyOrderedByAscOperator(_indexSegment,
_queryContext, expressions, projectOperator,
+
+ if (sortedColumnsPrefixSize == orderByExpressions.size()) {
+ maxDocsPerCall = Math.min(limit + _queryContext.getOffset(),
DocIdSetPlanNode.MAX_DOC_PER_CALL);
+ }
+
+ BaseProjectOperator<?> projectOperator = getSortedByProject(expressions,
maxDocsPerCall, orderByExpressions);
+ boolean asc = orderByExpressions.get(0).isAsc();
+ // Remember that we cannot use asc == projectOperator.isAscending()
because empty operators are considered
+ // both ascending and descending
+ DocIdOrderedOperator.DocIdOrder queryOrder =
DocIdOrderedOperator.DocIdOrder.fromAsc(asc);
+ if (projectOperator.isCompatibleWith(queryOrder)) {
+ return new SelectionPartiallyOrderedByLinearOperator(_indexSegment,
_queryContext, expressions, projectOperator,
sortedColumnsPrefixSize);
} else {
- BaseProjectOperator<?> projectOperator =
- new ProjectPlanNode(_segmentContext, _queryContext, expressions,
maxDocsPerCall).run();
return new SelectionPartiallyOrderedByDescOperation(_indexSegment,
_queryContext, expressions, projectOperator,
sortedColumnsPrefixSize);
}
@@ -120,6 +122,26 @@ public class SelectionPlanNode implements PlanNode {
return new SelectionOrderByOperator(_indexSegment, _queryContext,
expressions, projectOperator);
}
+ private BaseProjectOperator<?> getSortedByProject(List<ExpressionContext>
expressions, int maxDocsPerCall,
+ List<OrderByExpressionContext> orderByExpressions) {
+ BaseProjectOperator<?> projectOperator =
+ new ProjectPlanNode(_segmentContext, _queryContext, expressions,
maxDocsPerCall).run();
+
+ boolean asc = orderByExpressions.get(0).isAsc();
+ if (!asc
+ &&
QueryOptionsUtils.isReverseOrderAllowed(_queryContext.getQueryOptions())
+ &&
!projectOperator.isCompatibleWith(DocIdOrderedOperator.DocIdOrder.DESC)) {
+ try {
+ return projectOperator.withOrder(DocIdOrderedOperator.DocIdOrder.DESC);
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ // This happens when the operator cannot provide the required order
between blocks
+ // Fallback to SelectionOrderByOperator
+ return projectOperator;
+ }
+ }
+ return projectOperator;
+ }
+
/**
* This functions returns the number of expressions that are sorted by the
implicit order in the index.
*
@@ -174,17 +196,4 @@ public class SelectionPlanNode implements PlanNode {
}
}
}
-
- public enum OrderByAlgorithm {
- NAIVE;
-
- @Nullable
- public static OrderByAlgorithm fromQueryContext(QueryContext queryContext)
{
- String orderByAlgorithm =
QueryOptionsUtils.getOrderByAlgorithm(queryContext.getQueryOptions());
- if (orderByAlgorithm == null) {
- return null;
- }
- return OrderByAlgorithm.valueOf(orderByAlgorithm.toUpperCase());
- }
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index ef3c0e1e540..ed70023d1d6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -114,7 +115,7 @@ public class SelectionCombineOperatorTest {
}
@Test
- public void testSelectionLimit0() {
+ public void selectionLimit0() {
SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM
testTable LIMIT 0");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
@@ -130,7 +131,7 @@ public class SelectionCombineOperatorTest {
}
@Test
- public void testSelectionOnly() {
+ public void selectionOnly() {
SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM
testTable");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
@@ -177,8 +178,10 @@ public class SelectionCombineOperatorTest {
}
@Test
- public void testSelectionOrderBy() {
- SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM
testTable ORDER BY intColumn");
+ public void selectionOrderByAscending() {
+ SelectionResultsBlock combineResult = getCombineResult(
+ "SET allowReverseOrder=false;"
+ + "SELECT * FROM testTable ORDER BY intColumn");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
List<Object[]> rows = combineResult.getRows();
@@ -198,11 +201,16 @@ public class SelectionCombineOperatorTest {
int numSegmentsMatched = combineResult.getNumSegmentsMatched();
assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
+ }
- combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC");
+ @Test
+ public void selectionOrderByDescending() {
+ SelectionResultsBlock combineResult = getCombineResult(
+ "SET allowReverseOrder=false; "
+ + "SELECT * FROM testTable ORDER BY intColumn DESC");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- rows = combineResult.getRows();
+ List<Object[]> rows = combineResult.getRows();
assertNotNull(rows);
assertEquals(rows.size(), 10);
int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
@@ -211,24 +219,85 @@ public class SelectionCombineOperatorTest {
}
// Should early-terminate after processing the result of the first
segment. Each thread should process at most 1
// segment.
- numDocsScanned = combineResult.getNumDocsScanned();
+ long numDocsScanned = combineResult.getNumDocsScanned();
assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT
- && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT);
+ && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT,
+ "Expected number of docs scanned to be between " +
NUM_RECORDS_PER_SEGMENT + " and "
+ + (QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY *
NUM_RECORDS_PER_SEGMENT) + ", but got: "
+ + numDocsScanned);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
- numSegmentsMatched = combineResult.getNumSegmentsMatched();
+ int numSegmentsMatched = combineResult.getNumSegmentsMatched();
assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
+ }
- combineResult = getCombineResult("SELECT * FROM testTable ORDER BY
intColumn DESC LIMIT 10000");
+ @Test
+ public void selectionOrderByDescendingWithLargeLimit() {
+ SelectionResultsBlock combineResult = getCombineResult(
+ "SET allowReverseOrder=false; "
+ + "SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- rows = combineResult.getRows();
+ List<Object[]> rows = combineResult.getRows();
assertNotNull(rows);
assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
// Should not early-terminate
- numDocsScanned = combineResult.getNumDocsScanned();
+ long numDocsScanned = combineResult.getNumDocsScanned();
+ assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
+ }
+
+ @Test
+ public void selectionOrderByDescendingWithReverseOrder() {
+ int limit = 10;
+ SelectionResultsBlock combineResult = getCombineResult(
+ "SET allowReverseOrder=true; "
+ + "SELECT * FROM testTable "
+ + "ORDER BY intColumn DESC "
+ + "LIMIT " + limit);
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ List<Object[]> rows = combineResult.getRows();
+ assertNotNull(rows);
+ assertEquals(rows.size(), 10);
+ int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
+ for (int i = 0; i < 10; i++) {
+ assertEquals((int) rows.get(i)[0], expectedValue - i);
+ }
+ // Should early-terminate after processing the result of the first
segment. Each thread should process at most 1
+ // segment.
+ long numDocsScanned = combineResult.getNumDocsScanned();
+ assertTrue(numDocsScanned >= limit
+ && numDocsScanned <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY * limit,
+ "Expected number of docs scanned to be between " +
NUM_RECORDS_PER_SEGMENT + " and "
+ + (QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY *
NUM_RECORDS_PER_SEGMENT) + ", but got: "
+ + numDocsScanned);
+ assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
+ assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
+ assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS);
+ int numSegmentsMatched = combineResult.getNumSegmentsMatched();
+ assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <=
QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
+ assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
+ }
+
+ @Test
+ public void selectionOrderByDescendingWithLargeLimitAndReverseOrder() {
+ SelectionResultsBlock combineResult = getCombineResult(
+ "SET allowReverseOrder=true; "
+ + "SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
+ assertEquals(combineResult.getDataSchema(),
+ new DataSchema(new String[]{INT_COLUMN}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
+ List<Object[]> rows = combineResult.getRows();
+ assertNotNull(rows);
+ assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ // Should not early-terminate
+ long numDocsScanned = combineResult.getNumDocsScanned();
assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
assertEquals(combineResult.getNumEntriesScannedInFilter(), 0);
assertEquals(combineResult.getNumEntriesScannedPostFilter(),
numDocsScanned);
@@ -237,7 +306,7 @@ public class SelectionCombineOperatorTest {
assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS *
NUM_RECORDS_PER_SEGMENT);
}
- private SelectionResultsBlock getCombineResult(String query) {
+ private SelectionResultsBlock getCombineResult(@Language("sql") String
query) {
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(query);
List<PlanNode> planNodes = new ArrayList<>(NUM_SEGMENTS);
for (IndexSegment indexSegment : _indexSegments) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
index ad1783aebff..ccc7f578f7d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutorTest.java
@@ -128,7 +128,8 @@ public class DefaultAggregationExecutorTest {
}
int totalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
MatchAllFilterOperator matchAllFilterOperator = new
MatchAllFilterOperator(totalDocs);
- DocIdSetOperator docIdSetOperator = new
DocIdSetOperator(matchAllFilterOperator, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+ DocIdSetOperator docIdSetOperator =
+ new DocIdSetOperator(matchAllFilterOperator,
DocIdSetPlanNode.MAX_DOC_PER_CALL);
ProjectionOperator projectionOperator =
new ProjectionOperator(dataSourceMap, docIdSetOperator, new
QueryContext.Builder().build());
TransformOperator transformOperator = new TransformOperator(_queryContext,
projectionOperator, expressions);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 2132d395705..333b8db5de2 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -28,8 +28,8 @@ import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
+import
org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByLinearOperator;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.testng.annotations.Test;
@@ -264,11 +264,10 @@ public class InnerSegmentSelectionSingleValueQueriesTest
extends BaseSingleValue
}
@Test
- public void testSelectionOrderBySortedColumn() {
- // Test query order by single sorted column in ascending order
+ public void testSelectionOrderBySingleSortedColumnAsc() {
String orderBy = " ORDER BY column5";
BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(SELECTION_QUERY + orderBy);
- assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -285,137 +284,231 @@ public class
InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
+ }
- // Test query order by single sorted column in descending order
- orderBy = " ORDER BY column5 DESC";
- selectionOrderByOperator = getOperator(SELECTION_QUERY + orderBy);
- assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ @Test
+ public void testSelectionOrderBySingleSortedColumnDescDidOrder() {
+ String orderBy = " ORDER BY column5 DESC";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator = getOperator(
+ "SET allowReverseOrder = false; " + SELECTION_QUERY + orderBy);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation, "Expected: "
+ + SelectionPartiallyOrderedByLinearOperator.class + ", found: " +
selectionOrderByOperator.getClass());
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 30000 * (3 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column1", "column11"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
+ }
- // Test query order by all sorted columns in ascending order
+ @Test
+ public void testSelectionOrderBySingleSortedColumnDescReverseOrder() {
+ String orderBy = " ORDER BY column5 DESC";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator = getOperator(
+ "SET allowReverseOrder = true; " + SELECTION_QUERY + orderBy);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator, "Expected: "
+ + SelectionPartiallyOrderedByLinearOperator.class + ", found: " +
selectionOrderByOperator.getClass());
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 10 * (3 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column1", "column11"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.STRING});
+ List<Object[]> selectionResult = resultsBlock.getRows();
+ assertEquals(selectionResult.size(), 10);
+ Object[] lastRow = selectionResult.get(9);
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
+ }
+
+ @Test
+ public void testSelectionOrderByAllSortedColumnsAsc() {
String query = "SELECT column5, daysSinceEpoch FROM testTable ORDER BY
column5, daysSinceEpoch";
- selectionOrderByOperator = getOperator(query);
- assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 10L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 10 * (2 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 126164076);
+ }
- // Test query order by all sorted columns in descending order
- query = "SELECT column5 FROM testTable ORDER BY column5 DESC,
daysSinceEpoch DESC";
- selectionOrderByOperator = getOperator(query);
+ @Test
+ public void testSelectionOrderByAllSortedColumnsDescDidOrder() {
+ String query = "SET allowReverseOrder = false;"
+ + "SELECT column5 "
+ + "FROM testTable "
+ + "ORDER BY column5 DESC, daysSinceEpoch DESC";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 30000 * (2 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 167572854);
+ }
- // Test query order by one sorted column in ascending order, the other
sorted column in descending order
- query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5,
daysSinceEpoch DESC";
- selectionOrderByOperator = getOperator(query);
- assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ @Test
+ public void testSelectionOrderByAllSortedColumnsDescReverseOrder() {
+ String query = "SET allowReverseOrder = true;"
+ + "SELECT column5 "
+ + "FROM testTable "
+ + "ORDER BY column5 DESC, daysSinceEpoch DESC";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 10L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ // 10 * (2 columns)
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 20L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
+ assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
+ List<Object[]> selectionResult = resultsBlock.getRows();
+ assertEquals(selectionResult.size(), 10);
+ Object[] lastRow = selectionResult.get(9);
+ assertEquals(lastRow.length, 2);
+ assertEquals(lastRow[0], "gFuH");
+ assertEquals(lastRow[1], 167572854);
+ }
+
+ @Test
+ public void testSelectionOrderByMixedSortedColumns() {
+ String query = "SELECT daysSinceEpoch FROM testTable ORDER BY column5,
daysSinceEpoch DESC";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 30000 * (2 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 60000L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new
ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 167572854);
+ }
- // Test query order by one sorted column in ascending order, and some
unsorted columns
- query = "SELECT column1 FROM testTable ORDER BY column5, column6, column1";
- selectionOrderByOperator = getOperator(query);
- assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByAscOperator);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ @Test
+ public void testSelectionOrderBySortedAndUnsortedColumnsAsc() {
+ String query = "SELECT column1 FROM testTable ORDER BY column5, column6,
column1";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 30000 * (3 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column6", "column1"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.INT});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
- // Unsorted column values should be the same as ordering by their own
assertEquals(lastRow[1], 6043515);
assertEquals(lastRow[2], 10542595);
+ }
- // Test query order by one sorted column in descending order, and some
unsorted columns
- query = "SELECT column6 FROM testTable ORDER BY column5 DESC, column6,
column1";
- selectionOrderByOperator = getOperator(query);
+ @Test
+ public void testSelectionOrderBySortedAndUnsortedColumnsDescDidOrder() {
+ String query = "SET allowReverseOrder = false; "
+ + "SELECT column6 FROM testTable ORDER BY column5 DESC, column6,
column1";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByDescOperation);
- resultsBlock = selectionOrderByOperator.nextBlock();
- executionStatistics = selectionOrderByOperator.getExecutionStatistics();
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
// 30000 * (3 columns)
assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
- dataSchema = resultsBlock.getDataSchema();
+ DataSchema dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column6", "column1"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.INT});
- selectionResult = resultsBlock.getRows();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.get(9);
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
+ assertEquals(lastRow[1], 6043515);
// Unsorted column values should be the same as ordering by their own
+ assertEquals(lastRow[2], 10542595);
+ }
+
+ @Test
+ public void testSelectionOrderBySortedAndUnsortedColumnsDescReverseOrder() {
+ String query = "SET allowReverseOrder = true; "
+ + "SELECT column6 FROM testTable ORDER BY column5 DESC, column6,
column1";
+ BaseOperator<SelectionResultsBlock> selectionOrderByOperator =
getOperator(query);
+ assertTrue(selectionOrderByOperator instanceof
SelectionPartiallyOrderedByLinearOperator);
+ SelectionResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
+ ExecutionStatistics executionStatistics =
selectionOrderByOperator.getExecutionStatistics();
+ assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
+ assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
+ assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 90000L);
+ assertEquals(executionStatistics.getNumTotalDocs(), 30000L);
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"column5",
"column6", "column1"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT,
ColumnDataType.INT});
+ List<Object[]> selectionResult = resultsBlock.getRows();
+ assertEquals(selectionResult.size(), 10);
+ Object[] lastRow = selectionResult.get(9);
+ assertEquals(lastRow.length, 3);
+ assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 6043515);
+ // Unsorted column values should be the same as ordering by their own
assertEquals(lastRow[2], 10542595);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
index 0ea1e8b8b95..b96d16b0c54 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/TextMatchTransformFunctionTest.java
@@ -374,7 +374,8 @@ public class TextMatchTransformFunctionTest {
+ "BROKER_REDUCE(sort:[startTime ASC],limit:1000) | 1 | 0\n"
+ "COMBINE_SELECT_ORDERBY_MINMAX | 2 | 1\n"
+ "PLAN_START(numSegmentsForThisPlan:1) | -1 | -1\n"
- + "SELECT_PARTIAL_ORDER_BY_ASC(sortedList: (startTime),
unsortedList: (), rest: (case(is_null(agent),"
+ + "SELECT_PARTIAL_ORDER_BY_LINEAR("
+ + "sortedList: (startTime), unsortedList: (), rest:
(case(is_null(agent),"
+
"'N/A',and(text_match(part,'_zz_'),is_not_null(part)),agent,''))) | 3 | 2\n"
+
"TRANSFORM(case(is_null(agent),'N/A',and(text_match(part,'_zz_'),is_not_null(part)),agent,''),
"
+ "startTime) | 4 | 3\n"
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
index 41ca58636c6..884673617e6 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOrderByQueries.java
@@ -21,14 +21,10 @@ package org.apache.pinot.perf;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.queries.BaseQueriesTest;
@@ -36,20 +32,12 @@ import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tools.SortedColumnQuickstart;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -77,46 +65,17 @@ public class BenchmarkOrderByQueries extends
BaseQueriesTest {
public static void main(String[] args)
throws Exception {
- ChainedOptionsBuilder opt = new
OptionsBuilder().include(BenchmarkOrderByQueries.class.getSimpleName());
+ ChainedOptionsBuilder opt = new OptionsBuilder()
+ .include(BenchmarkOrderByQueries.class.getSimpleName());
new Runner(opt.build()).run();
}
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"FilteredAggregationsTest");
- private static final String TABLE_NAME = "MyTable";
private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
- private static final String INT_COL_NAME = "INT_COL";
- private static final String SORTED_COL_NAME = "SORTED_COL";
- private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
- private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
- private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
- private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
- private static final String LOW_CARDINALITY_STRING_COL =
"LOW_CARDINALITY_STRING_COL";
- private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
-
- private static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setInvertedIndexColumns(List.of(INT_COL_NAME,
LOW_CARDINALITY_STRING_COL))
- .setFieldConfigList(FIELD_CONFIGS)
- .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME))
- .setSortedColumn(SORTED_COL_NAME)
- .setRangeIndexColumns(List.of(INT_COL_NAME, LOW_CARDINALITY_STRING_COL))
- .setStarTreeIndexConfigs(Collections.singletonList(
- new StarTreeIndexConfig(Arrays.asList(SORTED_COL_NAME,
INT_COL_NAME), null, Collections.singletonList(
- new AggregationFunctionColumnPair(AggregationFunctionType.SUM,
RAW_INT_COL_NAME).toColumnName()), null,
- Integer.MAX_VALUE)))
- .build();
- private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
- .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
- .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
- .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
- .addSingleValueDimension(RAW_STRING_COL_NAME, FieldSpec.DataType.STRING)
- .addSingleValueDimension(NO_INDEX_STRING_COL, FieldSpec.DataType.STRING)
- .addSingleValueDimension(LOW_CARDINALITY_STRING_COL,
FieldSpec.DataType.STRING)
- .build();
-
- @Param({"true", "false"})
- private boolean _zasc; // called zasc just to force this parameter to be the
last used in the report
+
+ @Param({"ASC", "NAIVE_DESC", "OPTIMIZED_DESC"})
+ private Mode _zMode; // called zMode just to force this parameter to be the
last used in the report
@Param("1500000")
private int _numRows;
//@Param({"EXP(0.5)"})
@@ -138,7 +97,10 @@ public class BenchmarkOrderByQueries extends
BaseQueriesTest {
buildSegment(FIRST_SEGMENT_NAME);
buildSegment(SECOND_SEGMENT_NAME);
- IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(
+ SortedColumnQuickstart.SortedTable.TABLE_CONFIG,
+ SortedColumnQuickstart.SortedTable.SCHEMA
+ );
ImmutableSegment firstImmutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, FIRST_SEGMENT_NAME),
indexLoadingConfig);
@@ -158,34 +120,16 @@ public class BenchmarkOrderByQueries extends
BaseQueriesTest {
EXECUTOR_SERVICE.shutdownNow();
}
- private List<GenericRow> createTestData(int numRows) {
- Map<Integer, String> strings = new HashMap<>();
- List<GenericRow> rows = new ArrayList<>();
- String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i ->
"value" + i)
- .toArray(String[]::new);
- for (int i = 0; i < numRows; i += _primaryRepetitions) {
- for (int j = 0; j < _primaryRepetitions; j++) {
- GenericRow row = new GenericRow();
- row.putValue(SORTED_COL_NAME, i);
- row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
- row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
- row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
- row.putValue(RAW_STRING_COL_NAME, strings.computeIfAbsent(
- (int) _supplier.getAsLong(), k -> UUID.randomUUID().toString()));
- row.putValue(NO_INDEX_STRING_COL, row.getValue(RAW_STRING_COL_NAME));
- row.putValue(LOW_CARDINALITY_STRING_COL, lowCardinalityValues[(i + j)
% lowCardinalityValues.length]);
- rows.add(row);
- }
- }
- return rows;
- }
-
private void buildSegment(String segmentName)
throws Exception {
- List<GenericRow> rows = createTestData(_numRows);
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG,
SCHEMA);
+ List<GenericRow> rows =
SortedColumnQuickstart.SortedTable.streamData(_primaryRepetitions, _supplier)
+ .limit(_numRows)
+ .collect(Collectors.toCollection(() -> new ArrayList<>(_numRows)));
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(
+ SortedColumnQuickstart.SortedTable.TABLE_CONFIG,
+ SortedColumnQuickstart.SortedTable.SCHEMA);
config.setOutDir(INDEX_DIR.getPath());
- config.setTableName(TABLE_NAME);
+
config.setTableName(SortedColumnQuickstart.SortedTable.SCHEMA.getSchemaName());
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
@@ -197,34 +141,57 @@ public class BenchmarkOrderByQueries extends
BaseQueriesTest {
@Benchmark
public BrokerResponseNative sortedTotally() {
- if (_zasc) {
- return getBrokerResponse(
- "SELECT SORTED_COL "
- + "FROM MyTable "
- + "ORDER BY SORTED_COL ASC "
- + "LIMIT " + _limit);
- } else {
- return getBrokerResponse(
- "SELECT SORTED_COL "
- + "FROM MyTable "
- + "ORDER BY SORTED_COL DESC "
- + "LIMIT " + _limit);
+ switch (_zMode) {
+ case ASC:
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL ASC "
+ + "LIMIT " + _limit);
+ case NAIVE_DESC:
+ return getBrokerResponse(
+ "SET allowReverseOrder=false;"
+ + "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL DESC "
+ + "LIMIT " + _limit);
+ case OPTIMIZED_DESC:
+ return getBrokerResponse(
+ "SET allowReverseOrder=true;"
+ + "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL DESC "
+ + "LIMIT " + _limit);
+ default:
+ throw new IllegalStateException("Unknown mode: " + _zMode);
}
}
+
@Benchmark
public BrokerResponseNative sortedPartially() {
- if (_zasc) {
- return getBrokerResponse(
- "SELECT SORTED_COL "
- + "FROM MyTable "
- + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
- + "LIMIT " + _limit);
- } else {
- return getBrokerResponse(
- "SELECT SORTED_COL "
- + "FROM MyTable "
- + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
- + "LIMIT " + _limit);
+ switch (_zMode) {
+ case ASC:
+ return getBrokerResponse(
+ "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL ASC, LOW_CARDINALITY_STRING_COL "
+ + "LIMIT " + _limit);
+ case NAIVE_DESC:
+ return getBrokerResponse(
+ "SET allowReverseOrder=false;"
+ + "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+ + "LIMIT " + _limit);
+ case OPTIMIZED_DESC:
+ return getBrokerResponse(
+ "SET allowReverseOrder=true;"
+ + "SELECT SORTED_COL "
+ + "FROM sorted "
+ + "ORDER BY SORTED_COL DESC, LOW_CARDINALITY_STRING_COL "
+ + "LIMIT " + _limit);
+ default:
+ throw new IllegalStateException("Unknown mode: " + _zMode);
}
}
@@ -242,4 +209,8 @@ public class BenchmarkOrderByQueries extends
BaseQueriesTest {
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
+
+ public enum Mode {
+ ASC, NAIVE_DESC, OPTIMIZED_DESC
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
index 191b1b62c40..f09e06ae11a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitSVForwardIndexReaderV2.java
@@ -63,6 +63,16 @@ public final class FixedBitSVForwardIndexReaderV2 implements
ForwardIndexReader<
@Override
public void readDictIds(int[] docIds, int length, int[] dictIdBuffer,
ForwardIndexReaderContext context) {
+ int firstDocId = docIds[0];
+ int lastDocId = docIds[length - 1];
+ if (firstDocId <= lastDocId) {
+ readDictIdsAsc(docIds, length, dictIdBuffer, context);
+ } else {
+ readDictIdsDesc(docIds, length, dictIdBuffer, context);
+ }
+ }
+
+ private void readDictIdsAsc(int[] docIds, int length, int[] dictIdBuffer,
ForwardIndexReaderContext context) {
int firstDocId = docIds[0];
int lastDocId = docIds[length - 1];
int index = 0;
@@ -98,6 +108,13 @@ public final class FixedBitSVForwardIndexReaderV2
implements ForwardIndexReader<
}
}
+ private void readDictIdsDesc(int[] docIds, int length, int[] dictIdBuffer,
ForwardIndexReaderContext context) {
+ // TODO: Implement bulk read for descending order
+ for (int i = 0; i < length; i++) {
+ dictIdBuffer[i] = _reader.read(docIds[i]);
+ }
+ }
+
@Override
public void close() {
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7efc2c3c593..ed54afa62e5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -687,7 +687,8 @@ public class CommonConstants {
// Query option key used to enable a given set of defaultly disabled
rules
public static final String USE_PLANNER_RULES = "usePlannerRules";
- public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
+ public static final String ALLOW_REVERSE_ORDER = "allowReverseOrder";
+ public static final boolean DEFAULT_ALLOW_REVERSE_ORDER = false;
public static final String MULTI_STAGE_LEAF_LIMIT =
"multiStageLeafLimit";
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java
new file mode 100644
index 00000000000..b7756ace228
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/SortedColumnQuickstart.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.tools.admin.PinotAdministrator;
+
+/// Notice: In order to run this quickstart, you need to run the SortedTable
main method once to
+/// generate the content in the
pinot-tools/target/classes/examples/batch/sorted folder.
+public class SortedColumnQuickstart extends Quickstart {
+ private static final String QUICKSTART_IDENTIFIER = "SORTED";
+ @Override
+ public List<String> types() {
+ return Collections.singletonList(QUICKSTART_IDENTIFIER);
+ }
+
+ @Override
+ protected String[] getDefaultBatchTableDirectories() {
+ // contrary to other quickstarts, here we create the content automatically.
+ // it is important to notice this path is not a file path but a resource
path on the classpath.
+ // this means we can create the content in the
pinot-tools/target/classes/examples/batch/sorted folder
+ return new String[] { "examples/batch/sorted" };
+ }
+
+ @Override
+ protected Map<String, String> getDefaultStreamTableDirectories() {
+ return Map.of();
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ List<String> arguments = new ArrayList<>();
+ arguments.addAll(Arrays.asList("QuickStart", "-type",
QUICKSTART_IDENTIFIER));
+ arguments.addAll(Arrays.asList(args));
+ PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
+ }
+
+ public static class SortedTable {
+ private static final String TABLE_NAME = "sorted";
+ private static final String INT_COL_NAME = "INT_COL";
+ private static final String SORTED_COL_NAME = "SORTED_COL";
+ private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
+ private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
+ private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
+ private static final String NO_INDEX_STRING_COL = "NO_INDEX_STRING_COL";
+ private static final String LOW_CARDINALITY_STRING_COL =
"LOW_CARDINALITY_STRING_COL";
+ private static final List<FieldConfig> FIELD_CONFIGS = new ArrayList<>();
+
+ public static final TableConfig TABLE_CONFIG = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setInvertedIndexColumns(List.of(INT_COL_NAME,
LOW_CARDINALITY_STRING_COL))
+ .setFieldConfigList(FIELD_CONFIGS)
+ .setNoDictionaryColumns(List.of(RAW_INT_COL_NAME, RAW_STRING_COL_NAME))
+ .setSortedColumn(SORTED_COL_NAME)
+ .setRangeIndexColumns(List.of(INT_COL_NAME,
LOW_CARDINALITY_STRING_COL))
+ .setStarTreeIndexConfigs(Collections.singletonList(
+ new StarTreeIndexConfig(Arrays.asList(SORTED_COL_NAME,
INT_COL_NAME), null, Collections.singletonList(
+ new AggregationFunctionColumnPair(AggregationFunctionType.SUM,
RAW_INT_COL_NAME).toColumnName()), null,
+ Integer.MAX_VALUE)))
+ .build();
+ public static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)
+ .addSingleValueDimension(RAW_STRING_COL_NAME,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(NO_INDEX_STRING_COL,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LOW_CARDINALITY_STRING_COL,
FieldSpec.DataType.STRING)
+ .build();
+
+ public static void main(String[] args)
+ throws Exception {
+ Path activeDir = new File("").getAbsoluteFile().toPath();
+ Path pinotToolsPath;
+ if (activeDir.endsWith("pinot")) {
+ pinotToolsPath = activeDir.resolve("pinot-tools");
+ } else {
+ pinotToolsPath = activeDir;
+ }
+ Path bootstrapDir = pinotToolsPath.resolve(Path.of("src", "main",
"resources", "examples", "batch", "sorted"));
+ prepareBootstrapDir(bootstrapDir);
+ }
+
+ private static void prepareBootstrapDir(Path bootstrapDir)
+ throws IOException {
+ File bootstrapDirFile = bootstrapDir.toFile();
+ bootstrapDirFile.mkdirs();
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ File tableConfigFile =
bootstrapDir.resolve("sorted_offline_table_config.json").toFile();
+
objectMapper.writerWithDefaultPrettyPrinter().writeValue(tableConfigFile,
SortedTable.TABLE_CONFIG);
+ File schemaFile = bootstrapDir.resolve("sorted_schema.json").toFile();
+ objectMapper.writerWithDefaultPrettyPrinter().writeValue(schemaFile,
SortedTable.SCHEMA);
+
+ File rawdata = bootstrapDir.resolve("rawdata").toFile();
+ rawdata.mkdirs();
+
+ try (FileWriter fw = new FileWriter(new File(rawdata, "data.csv"))) {
+ // write the CSV header
+ fw.append(String.join(",", SortedTable.SORTED_COL_NAME,
SortedTable.INT_COL_NAME,
+ SortedTable.NO_INDEX_INT_COL_NAME, SortedTable.RAW_INT_COL_NAME,
SortedTable.RAW_STRING_COL_NAME,
+ SortedTable.NO_INDEX_STRING_COL,
SortedTable.LOW_CARDINALITY_STRING_COL)).append('\n');
+
+ Random r = new Random(42);
+ streamData(1000, r::nextLong)
+ .limit(100_000)
+ .forEach(s -> {
+ // generate CSV line
+ try {
+
fw.append(String.valueOf(s.getValue(SortedTable.SORTED_COL_NAME))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.INT_COL_NAME))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.NO_INDEX_INT_COL_NAME))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.RAW_INT_COL_NAME))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.RAW_STRING_COL_NAME))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.NO_INDEX_STRING_COL))).append(',');
+
fw.append(String.valueOf(s.getValue(SortedTable.LOW_CARDINALITY_STRING_COL))).append('\n');
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ public static Stream<GenericRow> streamData(int primaryRepetitions,
LongSupplier longSupplier) {
+ Map<Integer, String> strings = new HashMap<>();
+ String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i ->
"value" + i)
+ .toArray(String[]::new);
+ return IntStream.iterate(0, i -> i + 1)
+ .boxed()
+ .flatMap(i -> IntStream.range(0, primaryRepetitions)
+ .mapToObj(j -> {
+ GenericRow row = new GenericRow();
+ row.putValue(SortedTable.SORTED_COL_NAME, i);
+ row.putValue(SortedTable.INT_COL_NAME, (int)
longSupplier.getAsLong());
+ row.putValue(SortedTable.NO_INDEX_INT_COL_NAME, (int)
longSupplier.getAsLong());
+ row.putValue(SortedTable.RAW_INT_COL_NAME, (int)
longSupplier.getAsLong());
+ row.putValue(SortedTable.RAW_STRING_COL_NAME,
strings.computeIfAbsent(
+ (int) longSupplier.getAsLong(), k ->
UUID.randomUUID().toString()));
+ row.putValue(SortedTable.NO_INDEX_STRING_COL,
row.getValue(SortedTable.RAW_STRING_COL_NAME));
+ int lowCardinalityIndex = (i + j) %
lowCardinalityValues.length;
+ row.putValue(SortedTable.LOW_CARDINALITY_STRING_COL,
lowCardinalityValues[lowCardinalityIndex]);
+ return row;
+ })
+ );
+ }
+ }
+}
diff --git
a/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml
b/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml
new file mode 100644
index 00000000000..e17fe2e93e4
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/batch/sorted/ingestionJobSpec.yaml
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+ # name: execution framework name
+ name: 'standalone'
+
+ # Class to use for segment generation and different push types.
+ segmentGenerationJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
+ segmentTarPushJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
+ segmentUriPushJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
+ segmentMetadataPushJobRunnerClassName:
'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner'
+
+# jobType: Pinot ingestion job type.
+# Supported job types are defined in PinotIngestionJobType class.
+# 'SegmentCreation'
+# 'SegmentTarPush'
+# 'SegmentUriPush'
+# 'SegmentMetadataPush'
+# 'SegmentCreationAndTarPush'
+# 'SegmentCreationAndUriPush'
+# 'SegmentCreationAndMetadataPush'
+jobType: SegmentCreationAndTarPush
+
+# inputDirURI: Root directory of input data, expected to have scheme
configured in PinotFS.
+inputDirURI: 'examples/batch/sorted/rawdata'
+
+# includeFileNamePattern: include file name pattern, supported glob pattern.
+# Sample usage:
+# 'glob:*.csv' will include all csv files just under the inputDirURI, not
sub directories;
+# 'glob:**/*.csv' will include all the csv files under inputDirURI
recursively.
+includeFileNamePattern: 'glob:**/*.csv'
+
+# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
+# Sample usage:
+# 'glob:*.csv' will exclude all csv files just under the inputDirURI, not
sub directories;
+# 'glob:**/*.csv' will exclude all the csv files under inputDirURI
recursively.
+# _excludeFileNamePattern: ''
+
+# outputDirURI: Root directory of output segments, expected to have scheme
configured in PinotFS.
+outputDirURI: 'examples/batch/sorted/segments'
+
+# segmentCreationJobParallelism: Parallelism to build Pinot segments.
+segmentCreationJobParallelism: 4
+
+# overwriteOutput: Overwrite output segments if existed.
+overwriteOutput: true
+
+# pinotFSSpecs: defines all related Pinot file systems.
+pinotFSSpecs:
+
+ - # scheme: used to identify a PinotFS.
+ # E.g. local, hdfs, dbfs, etc
+ scheme: file
+
+ # className: Class name used to create the PinotFS instance.
+ # E.g.
+ # org.apache.pinot.spi.filesystem.LocalPinotFS is used for local
filesystem
+ # org.apache.pinot.plugin.filesystem.AzurePinotFS is used for Azure Data
Lake
+ # org.apache.pinot.plugin.filesystem.HadoopPinotFS is used for HDFS
+ className: org.apache.pinot.spi.filesystem.LocalPinotFS
+
+# recordReaderSpec: defines all record reader
+recordReaderSpec:
+
+ # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv',
'json', 'thrift' etc.
+ dataFormat: 'csv'
+
+ # className: Corresponding RecordReader class name.
+ className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
+
+# tableSpec: defines table name and where to fetch corresponding table config
and table schema.
+tableSpec:
+
+ # tableName: Table name
+ tableName: 'sorted'
+
+ # schemaURI: defines where to read the table schema, supports PinotFS or
HTTP.
+ # E.g.
+ # hdfs://path/to/table_schema.json
+ # http://localhost:9000/tables/myTable/schema
+ schemaURI: 'http://localhost:9000/tables/sorted/schema'
+
+ # tableConfigURI: defines where to reade the table config.
+ # Supports using PinotFS or HTTP.
+ # E.g.
+ # hdfs://path/to/table_config.json
+ # http://localhost:9000/tables/myTable
+ # Note that the API to read Pinot table config directly from pinot
controller contains a JSON wrapper.
+ # The real table config is the object under the field 'OFFLINE'.
+ tableConfigURI: 'http://localhost:9000/tables/sorted'
+
+# pinotClusterSpecs: defines the Pinot Cluster Access Point.
+pinotClusterSpecs:
+ - # controllerURI: used to fetch table/schema information and data push.
+ # E.g. http://localhost:9000
+ controllerURI: 'http://localhost:9000'
+
+# pushJobSpec: defines segment push job related configuration.
+pushJobSpec:
+
+ # pushAttempts: number of attempts for push job, default is 1, which means
no retry.
+ pushAttempts: 2
+
+ # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
+ pushRetryIntervalMillis: 1000
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]