This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 174eb4b Early termination for combining selection order-by results (#5686) 174eb4b is described below commit 174eb4b99c1ccc0645e1ba47eb3f26a1c816efb5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jul 23 10:34:56 2020 -0700 Early termination for combining selection order-by results (#5686) Split `CombineOperator` into 3 different combine operators: - `SelectionOnlyCombineOperator` - `SelectionOrderByCombineOperator` - `AggregationOnlyOrderByCombineOperator` (For aggregation-group by, rename the combine operator to `GroupByCombineOperator` and `GroupByOrderByCombineOperator`) Re-implement the combine operator logic in `BaseCombineOperator` for the early-termination enhancement: - Worker threads no longer perform the result merge, but insert the results block from each segment to the blocking queue - Worker threads can early-terminate if the results block for one segment can satisfy the query - Main thread will merge the results blocks from the worker threads, and early-terminate if the merged results block can satisfy the query For `SelectionOnlyCombineOperator`: - Only process 1 segment for `LIMIT 0` - Early termination when enough rows are collected For `SelectionOrderByCombineOperator`: - If the first order-by expression is an identifier (column), sort segments by the column min/max value and early terminate when result for the segment must not be in the final result. --- .../pinot/core/operator/CombineOperator.java | 226 ------------- .../pinot/core/operator/ExecutionStatistics.java | 46 +-- .../operator/blocks/IntermediateResultsBlock.java | 52 ++- .../combine/AggregationOnlyCombineOperator.java | 58 ++++ .../core/operator/combine/BaseCombineOperator.java | 185 +++++++++++ .../operator/combine/CombineOperatorUtils.java | 74 +++++ .../GroupByCombineOperator.java} | 27 +- .../GroupByOrderByCombineOperator.java} | 30 +- .../combine/SelectionOnlyCombineOperator.java | 95 ++++++ .../combine/SelectionOrderByCombineOperator.java | 356 +++++++++++++++++++++ .../operator/query/SelectionOrderByOperator.java | 18 +- .../apache/pinot/core/plan/CombinePlanNode.java | 36 ++- .../pinot/core/query/reduce/CombineService.java | 145 --------- .../{ => combine}/CombineSlowOperatorsTest.java | 36 ++- .../combine/SelectionCombineOperatorTest.java | 243 ++++++++++++++ .../pinot/queries/BaseSingleValueQueriesTest.java | 12 +- .../queries/SelectionOnlyEarlyTerminationTest.java | 124 ------- 17 files changed, 1139 insertions(+), 624 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java deleted file mode 100644 index accd0f0..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.query.exception.EarlyTerminationException; -import org.apache.pinot.core.query.reduce.CombineService; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; -import org.apache.pinot.core.util.trace.TraceCallable; -import org.apache.pinot.core.util.trace.TraceRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The <code>CombineOperator</code> class is the operator to combine selection results and aggregation only results. - */ -@SuppressWarnings("rawtypes") -public class CombineOperator extends BaseOperator<IntermediateResultsBlock> { - private static final Logger LOGGER = LoggerFactory.getLogger(CombineOperator.class); - private static final String OPERATOR_NAME = "CombineOperator"; - - // Use at most 10 or half of the processors threads for each query. - // If there are less than 2 processors, use 1 thread. - // Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g. Kubernetes. - public static final int MAX_NUM_THREADS_PER_QUERY = - Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2)); - - private final List<Operator> _operators; - private final QueryContext _queryContext; - private final ExecutorService _executorService; - private final long _timeOutMs; - - public CombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, - long timeOutMs) { - _operators = operators; - _queryContext = queryContext; - _executorService = executorService; - _timeOutMs = timeOutMs; - } - - @Override - protected IntermediateResultsBlock getNextBlock() { - long startTimeMs = System.currentTimeMillis(); - long endTimeMs = startTimeMs + _timeOutMs; - int numOperators = _operators.size(); - // Try to use all MAX_NUM_THREADS_PER_QUERY threads for the query, but ensure each thread has at least one operator - int numThreads = Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY); - - // We use a BlockingQueue to store the results for each operator group, and track if all operator groups are - // finished by the query timeout, and cancel the unfinished futures (try to interrupt the execution if it already - // started). - // Besides the BlockingQueue, we also use a Phaser to ensure all the Futures are done (not scheduled, finished or - // interrupted) before the main thread returns. We need to ensure no execution left before the main thread returning - // because the main thread holds the reference to the segments, and if the segments are deleted/refreshed, the - // segments can be released after the main thread returns, which would lead to undefined behavior (even JVM crash) - // when executing queries against them. - BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numThreads); - Phaser phaser = new Phaser(1); - - // Submit operator group execution jobs - Future[] futures = new Future[numThreads]; - for (int i = 0; i < numThreads; i++) { - int index = i; - futures[i] = _executorService.submit(new TraceRunnable() { - @Override - public void runJob() { - try { - // Register the thread to the phaser. - // If the phaser is terminated (returning negative value) when trying to register the thread, that means the - // query execution has timed out, and the main thread has deregistered itself and returned the result. - // Directly return as no execution result will be taken. - if (phaser.register() < 0) { - return; - } - - IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock) _operators.get(index).nextBlock(); - for (int i = index + numThreads; i < numOperators; i += numThreads) { - if (isQuerySatisfied(_queryContext, mergedBlock)) { - break; - } - IntermediateResultsBlock blockToMerge = (IntermediateResultsBlock) _operators.get(i).nextBlock(); - try { - CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge); - } catch (Exception e) { - LOGGER.error("Caught exception while merging two blocks (step 1).", e); - mergedBlock - .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e)); - } - } - blockingQueue.offer(mergedBlock); - } catch (EarlyTerminationException e) { - // Early-terminated because query times out or is already satisfied - } catch (Exception e) { - LOGGER.error("Caught exception while executing query.", e); - blockingQueue.offer(new IntermediateResultsBlock(e)); - } finally { - phaser.arriveAndDeregister(); - } - } - }); - } - - // Submit operator groups merge job - Future<IntermediateResultsBlock> mergedBlockFuture = - _executorService.submit(new TraceCallable<IntermediateResultsBlock>() { - @Override - public IntermediateResultsBlock callJob() - throws Exception { - IntermediateResultsBlock mergedBlock = - blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (mergedBlock == null) { - throw new TimeoutException("Timed out while polling result from first thread"); - } - int numMergedBlocks = 1; - while (numMergedBlocks < numThreads) { - if (isQuerySatisfied(_queryContext, mergedBlock)) { - break; - } - IntermediateResultsBlock blockToMerge = - blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (blockToMerge == null) { - throw new TimeoutException("Timed out while polling result from thread: " + numMergedBlocks); - } - try { - CombineService.mergeTwoBlocks(_queryContext, mergedBlock, blockToMerge); - } catch (Exception e) { - LOGGER.error("Caught exception while merging two blocks (step 2).", e); - mergedBlock - .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e)); - } - numMergedBlocks++; - } - return mergedBlock; - } - }); - - // Get merge results. - IntermediateResultsBlock mergedBlock; - try { - mergedBlock = mergedBlockFuture.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException. (queryContext = {})", _queryContext, e); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.FUTURE_CALL_ERROR, e)); - } catch (ExecutionException e) { - LOGGER.error("Caught ExecutionException. (queryContext = {})", _queryContext, e); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, e)); - } catch (TimeoutException e) { - LOGGER.error("Caught TimeoutException. (queryContext = {})", _queryContext, e); - mergedBlockFuture.cancel(true); - mergedBlock = - new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, e)); - } finally { - // Cancel all ongoing jobs - for (Future future : futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - // Deregister the main thread and wait for all threads done - phaser.awaitAdvance(phaser.arriveAndDeregister()); - } - - // Update execution statistics. - ExecutionStatistics executionStatistics = new ExecutionStatistics(); - for (Operator operator : _operators) { - executionStatistics.merge(operator.getExecutionStatistics()); - } - mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned()); - mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter()); - mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter()); - mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs()); - mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed()); - mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched()); - - return mergedBlock; - } - - /** - * Returns {@code true} if the query is already satisfied with the IntermediateResultsBlock so that there is no need - * to process more segments, {@code false} otherwise. - * <p>For selection-only query, the query is satisfied when enough records are gathered. - */ - private boolean isQuerySatisfied(QueryContext queryContext, IntermediateResultsBlock mergedBlock) { - if (!QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getOrderByExpressions() == null) { - // Selection-only - Collection<Object[]> selectionResult = mergedBlock.getSelectionResult(); - return selectionResult != null && selectionResult.size() >= queryContext.getLimit(); - } - return false; - } - - @Override - public String getOperatorName() { - return OPERATOR_NAME; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java index 1c99d24..04c2665 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ExecutionStatistics.java @@ -23,7 +23,7 @@ package org.apache.pinot.core.operator; */ public class ExecutionStatistics { // The number of documents scanned post filtering. - private long _numDocsScanned; + private final long _numDocsScanned; // The number of entries (single value entry contains 1 value, multi-value entry may contain multiple values) scanned // in the filtering phase of the query execution: could be larger than the total scanned doc num because of multiple // filtering predicates and multi-value entry. @@ -31,15 +31,13 @@ public class ExecutionStatistics { // _numEntriesScannedInFilter counts values in a multi-value entry multiple times whereas in // _numEntriesScannedPostFilter counts all values in a multi-value entry only once. We can add a new stats // _numValuesScannedInFilter to replace _numEntriesScannedInFilter. - private long _numEntriesScannedInFilter; + private final long _numEntriesScannedInFilter; // Equal to numDocsScanned * numberProjectedColumns. - private long _numEntriesScannedPostFilter; - private long _numTotalDocs; - private long _numSegmentsProcessed; - private long _numSegmentsMatched; + private final long _numEntriesScannedPostFilter; - public ExecutionStatistics() { - } + // TODO: Remove _numTotalDocs because it is not execution stats, and it is not set from the operators because they + // don't cover the pruned segments + private final long _numTotalDocs; public ExecutionStatistics(long numDocsScanned, long numEntriesScannedInFilter, long numEntriesScannedPostFilter, long numTotalDocs) { @@ -47,8 +45,6 @@ public class ExecutionStatistics { _numEntriesScannedInFilter = numEntriesScannedInFilter; _numEntriesScannedPostFilter = numEntriesScannedPostFilter; _numTotalDocs = numTotalDocs; - _numSegmentsProcessed = 1; - _numSegmentsMatched = (numDocsScanned == 0) ? 0 : 1; } public long getNumDocsScanned() { @@ -66,34 +62,4 @@ public class ExecutionStatistics { public long getNumTotalDocs() { return _numTotalDocs; } - - public long getNumSegmentsProcessed() { - return _numSegmentsProcessed; - } - - public long getNumSegmentsMatched() { - return _numSegmentsMatched; - } - - /** - * Merge another execution statistics into the current one. - * - * @param executionStatisticsToMerge execution statistics to merge. - */ - public void merge(ExecutionStatistics executionStatisticsToMerge) { - _numDocsScanned += executionStatisticsToMerge._numDocsScanned; - _numEntriesScannedInFilter += executionStatisticsToMerge._numEntriesScannedInFilter; - _numEntriesScannedPostFilter += executionStatisticsToMerge._numEntriesScannedPostFilter; - _numTotalDocs += executionStatisticsToMerge._numTotalDocs; - _numSegmentsProcessed += executionStatisticsToMerge._numSegmentsProcessed; - _numSegmentsMatched += executionStatisticsToMerge._numSegmentsMatched; - } - - @Override - public String toString() { - return "Execution Statistics:" + "\n numDocsScanned: " + _numDocsScanned + "\n numEntriesScannedInFilter: " - + _numEntriesScannedInFilter + "\n numEntriesScannedPostFilter: " + _numEntriesScannedPostFilter - + "\n numTotalDocs: " + _numTotalDocs + "\n numSegmentsProcessed: " + _numSegmentsProcessed - + "\n numSegmentsMatched: " + _numSegmentsMatched; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java index 63efe7c..d0eb2e2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.blocks; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -60,8 +61,8 @@ public class IntermediateResultsBlock implements Block { private long _numEntriesScannedInFilter; private long _numEntriesScannedPostFilter; private long _numTotalDocs; - private long _numSegmentsProcessed; - private long _numSegmentsMatched; + private int _numSegmentsProcessed; + private int _numSegmentsMatched; private boolean _numGroupsLimitReached; private Table _table; @@ -198,19 +199,11 @@ public class IntermediateResultsBlock implements Block { _numEntriesScannedPostFilter = numEntriesScannedPostFilter; } - public long getNumSegmentsProcessed() { - return _numSegmentsProcessed; - } - - public void setNumSegmentsProcessed(long numSegmentsProcessed) { + public void setNumSegmentsProcessed(int numSegmentsProcessed) { _numSegmentsProcessed = numSegmentsProcessed; } - public long getNumSegmentsMatched() { - return _numSegmentsMatched; - } - - public void setNumSegmentsMatched(long numSegmentsMatched) { + public void setNumSegmentsMatched(int numSegmentsMatched) { _numSegmentsMatched = numSegmentsMatched; } @@ -222,6 +215,41 @@ public class IntermediateResultsBlock implements Block { _numGroupsLimitReached = numGroupsLimitReached; } + @VisibleForTesting + public long getNumDocsScanned() { + return _numDocsScanned; + } + + @VisibleForTesting + public long getNumEntriesScannedInFilter() { + return _numEntriesScannedInFilter; + } + + @VisibleForTesting + public long getNumEntriesScannedPostFilter() { + return _numEntriesScannedPostFilter; + } + + @VisibleForTesting + public int getNumSegmentsProcessed() { + return _numSegmentsProcessed; + } + + @VisibleForTesting + public int getNumSegmentsMatched() { + return _numSegmentsMatched; + } + + @VisibleForTesting + public long getNumTotalDocs() { + return _numTotalDocs; + } + + @VisibleForTesting + public boolean isNumGroupsLimitReached() { + return _numGroupsLimitReached; + } + public DataTable getDataTable() throws Exception { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java new file mode 100644 index 0000000..83a474e --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/AggregationOnlyCombineOperator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; + + +/** + * Combine operator for aggregation only queries. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class AggregationOnlyCombineOperator extends BaseCombineOperator { + private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator"; + + public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService, long timeOutMs) { + super(operators, queryContext, executorService, timeOutMs); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { + AggregationFunction[] aggregationFunctions = mergedBlock.getAggregationFunctions(); + List<Object> mergedResults = mergedBlock.getAggregationResult(); + List<Object> resultsToMerge = blockToMerge.getAggregationResult(); + assert aggregationFunctions != null && mergedResults != null && resultsToMerge != null; + + int numAggregationFunctions = aggregationFunctions.length; + for (int i = 0; i < numAggregationFunctions; i++) { + mergedResults.set(i, aggregationFunctions[i].merge(mergedResults.get(i), resultsToMerge.get(i))); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java new file mode 100644 index 0000000..2d10a35 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.query.exception.EarlyTerminationException; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.trace.TraceRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base implementation of the combine operator. + * <p>Combine operator uses multiple worker threads to process segments in parallel, and uses the main thread to merge + * the results blocks from the processed segments. It can early-terminate the query to save the system resources if it + * detects that the merged results can already satisfy the query, or the query is already errored out or timed out. + */ +@SuppressWarnings("rawtypes") +public abstract class BaseCombineOperator extends BaseOperator<IntermediateResultsBlock> { + protected static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class); + + protected final List<Operator> _operators; + protected final QueryContext _queryContext; + protected final ExecutorService _executorService; + protected final long _timeOutMs; + + public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, + long timeOutMs) { + _operators = operators; + _queryContext = queryContext; + _executorService = executorService; + _timeOutMs = timeOutMs; + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + long startTimeMs = System.currentTimeMillis(); + long endTimeMs = startTimeMs + _timeOutMs; + int numOperators = _operators.size(); + int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators); + + // Use a BlockingQueue to store the per-segment result + BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators); + // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread + // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is + // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined + // behavior (even JVM crash) when processing queries against it. + Phaser phaser = new Phaser(1); + + Future[] futures = new Future[numThreads]; + for (int i = 0; i < numThreads; i++) { + int threadIndex = i; + futures[i] = _executorService.submit(new TraceRunnable() { + @Override + public void runJob() { + try { + // Register the thread to the phaser + // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that + // means the query execution has finished, and the main thread has deregistered itself and returned + // the result. Directly return as no execution result will be taken. + if (phaser.register() < 0) { + return; + } + + for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) { + try { + IntermediateResultsBlock resultsBlock = + (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); + if (isQuerySatisfied(resultsBlock)) { + // Query is satisfied, skip processing the remaining segments + blockingQueue.offer(resultsBlock); + return; + } else { + blockingQueue.offer(resultsBlock); + } + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, + _queryContext, e); + blockingQueue.offer(new IntermediateResultsBlock(e)); + return; + } + } + } finally { + phaser.arriveAndDeregister(); + } + } + }); + } + + IntermediateResultsBlock mergedBlock = null; + try { + int numBlocksMerged = 0; + while (numBlocksMerged < numOperators) { + IntermediateResultsBlock blockToMerge = + blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (blockToMerge == null) { + // Query times out, skip merging the remaining results blocks + LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, + _queryContext); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); + break; + } + if (blockToMerge.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip merging the remaining results blocks and directly return + // the exception + mergedBlock = blockToMerge; + break; + } + if (mergedBlock == null) { + mergedBlock = blockToMerge; + } else { + mergeResultsBlocks(mergedBlock, blockToMerge); + } + numBlocksMerged++; + if (isQuerySatisfied(mergedBlock)) { + // Query is satisfied, skip merging the remaining results blocks + break; + } + } + } catch (Exception e) { + LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + } finally { + // Cancel all ongoing jobs + for (Future future : futures) { + if (!future.isDone()) { + future.cancel(true); + } + } + // Deregister the main thread and wait for all threads done + phaser.awaitAdvance(phaser.arriveAndDeregister()); + } + + CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators); + return mergedBlock; + } + + /** + * Can be overridden for early termination. + */ + protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) { + return false; + } + + /** + * Merge an IntermediateResultsBlock into the main IntermediateResultsBlock. + * <p>NOTE: {@code blockToMerge} should contain the result for a segment without any exception. The errored segment + * result is already handled. + */ + protected abstract void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, + IntermediateResultsBlock blockToMerge); +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java new file mode 100644 index 0000000..fd7737f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/CombineOperatorUtils.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.List; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; + + +@SuppressWarnings("rawtypes") +public class CombineOperatorUtils { + private CombineOperatorUtils() { + } + + /** + * Use at most 10 or half of the processors threads for each query. If there are less than 2 processors, use 1 thread. + * <p>NOTE: Runtime.getRuntime().availableProcessors() may return value < 2 in container based environment, e.g. + * Kubernetes. + */ + public static final int MAX_NUM_THREADS_PER_QUERY = + Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2)); + + /** + * Returns the number of threads used to execute the query in parallel. + */ + public static int getNumThreadsForQuery(int numOperators) { + return Math.min(numOperators, MAX_NUM_THREADS_PER_QUERY); + } + + /** + * Sets the execution statistics into the results block. + */ + public static void setExecutionStatistics(IntermediateResultsBlock resultsBlock, List<Operator> operators) { + int numSegmentsProcessed = operators.size(); + int numSegmentsMatched = 0; + long numDocsScanned = 0; + long numEntriesScannedInFilter = 0; + long numEntriesScannedPostFilter = 0; + long numTotalDocs = 0; + for (Operator operator : operators) { + ExecutionStatistics executionStatistics = operator.getExecutionStatistics(); + if (executionStatistics.getNumDocsScanned() > 0) { + numSegmentsMatched++; + } + numDocsScanned += executionStatistics.getNumDocsScanned(); + numEntriesScannedInFilter += executionStatistics.getNumEntriesScannedInFilter(); + numEntriesScannedPostFilter += executionStatistics.getNumEntriesScannedPostFilter(); + numTotalDocs += executionStatistics.getNumTotalDocs(); + } + resultsBlock.setNumSegmentsProcessed(numSegmentsProcessed); + resultsBlock.setNumSegmentsMatched(numSegmentsMatched); + resultsBlock.setNumDocsScanned(numDocsScanned); + resultsBlock.setNumEntriesScannedInFilter(numEntriesScannedInFilter); + resultsBlock.setNumEntriesScannedPostFilter(numEntriesScannedPostFilter); + resultsBlock.setNumTotalDocs(numTotalDocs); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java similarity index 90% rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 5b6b9e6..476b459 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.operator; +package org.apache.pinot.core.operator.combine; import java.util.ArrayList; import java.util.Iterator; @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; @@ -48,12 +49,15 @@ import org.slf4j.LoggerFactory; /** - * The <code>CombineGroupByOperator</code> class is the operator to combine aggregation group-by results. + * Combine operator for aggregation group-by queries with PQL semantic. + * TODO: + * - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads + * - Try to extend BaseCombineOperator to reduce duplicate code */ @SuppressWarnings("rawtypes") -public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBlock> { - private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOperator.class); - private static final String OPERATOR_NAME = "CombineGroupByOperator"; +public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GroupByCombineOperator.class); + private static final String OPERATOR_NAME = "GroupByCombineOperator"; // Use a higher limit for groups stored across segments. For most cases, most groups from each segment should be the // same, thus the total number of groups across segments should be equal or slightly higher than the number of groups @@ -69,7 +73,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc private final int _innerSegmentNumGroupsLimit; private final int _interSegmentNumGroupsLimit; - public CombineGroupByOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, + public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long timeOutMs, int innerSegmentNumGroupsLimit) { _operators = operators; _queryContext = queryContext; @@ -208,16 +212,7 @@ public class CombineGroupByOperator extends BaseOperator<IntermediateResultsBloc } // Set the execution statistics. - ExecutionStatistics executionStatistics = new ExecutionStatistics(); - for (Operator operator : _operators) { - executionStatistics.merge(operator.getExecutionStatistics()); - } - mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned()); - mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter()); - mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter()); - mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed()); - mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched()); - mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs()); + CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators); // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we // are comparing number of groups across segments with the groups limit for each segment. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java similarity index 88% rename from pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java rename to pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index f0001af..627e9f4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.operator; +package org.apache.pinot.core.operator.combine; import java.util.ArrayList; import java.util.Iterator; @@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.ConcurrentIndexedTable; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; @@ -53,15 +54,15 @@ import org.slf4j.LoggerFactory; /** - * The <code>CombineGroupByOrderByOperator</code> class is the operator to combine aggregation results with group-by and order by. + * Combine operator for aggregation group-by queries with SQL semantic. + * TODO: + * - Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using all threads + * - Try to extend BaseCombineOperator to reduce duplicate code */ -// TODO: this class has a lot of duplication with {@link CombineGroupByOperator}. -// These 2 classes can be combined into one -// For the first iteration of Order By support, these will be separate @SuppressWarnings("rawtypes") -public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResultsBlock> { - private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOrderByOperator.class); - private static final String OPERATOR_NAME = "CombineGroupByOrderByOperator"; +public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResultsBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class); + private static final String OPERATOR_NAME = "GroupByOrderByCombineOperator"; private final List<Operator> _operators; private final QueryContext _queryContext; @@ -72,7 +73,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu private DataSchema _dataSchema; private ConcurrentIndexedTable _indexedTable; - public CombineGroupByOrderByOperator(List<Operator> operators, QueryContext queryContext, + public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long timeOutMs) { _operators = operators; _queryContext = queryContext; @@ -220,16 +221,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu } // Set the execution statistics. - ExecutionStatistics executionStatistics = new ExecutionStatistics(); - for (Operator operator : _operators) { - executionStatistics.merge(operator.getExecutionStatistics()); - } - mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned()); - mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter()); - mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter()); - mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed()); - mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched()); - mergedBlock.setNumTotalDocs(executionStatistics.getNumTotalDocs()); + CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators); if (_indexedTable.size() >= _indexedTableCapacity) { mergedBlock.setNumGroupsLimitReached(true); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java new file mode 100644 index 0000000..07aad33 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOnlyCombineOperator.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + + +/** + * Combine operator for selection only queries. + * <p>For query with LIMIT 0, directly use main thread to process one segment to get the data schema of the query. + * <p>Query can be early-terminated when enough documents have been collected to fulfill the LIMIT requirement. + * <p>NOTE: Selection order-by query with LIMIT 0 is treated as selection only query. + */ +@SuppressWarnings("rawtypes") +public class SelectionOnlyCombineOperator extends BaseCombineOperator { + private static final String OPERATOR_NAME = "SelectionOnlyCombineOperator"; + + private final int _numRowsToKeep; + + public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService, long timeOutMs) { + super(operators, queryContext, executorService, timeOutMs); + _numRowsToKeep = queryContext.getLimit(); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + // For LIMIT 0 query, only process one segment to get the data schema + if (_numRowsToKeep == 0) { + IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(0).nextBlock(); + CombineOperatorUtils.setExecutionStatistics(resultsBlock, _operators); + return resultsBlock; + } + + return super.getNextBlock(); + } + + @Override + protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) { + Collection<Object[]> selectionResult = resultsBlock.getSelectionResult(); + assert selectionResult != null; + return selectionResult.size() == _numRowsToKeep; + } + + @Override + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { + DataSchema mergedDataSchema = mergedBlock.getDataSchema(); + DataSchema dataSchemaToMerge = blockToMerge.getDataSchema(); + assert mergedDataSchema != null && dataSchemaToMerge != null; + if (!mergedDataSchema.equals(dataSchemaToMerge)) { + String errorMessage = String + .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge", + mergedDataSchema, dataSchemaToMerge); + // NOTE: This is segment level log, so log at debug level to prevent flooding the log. + LOGGER.debug(errorMessage); + mergedBlock + .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); + return; + } + + Collection<Object[]> mergedRows = mergedBlock.getSelectionResult(); + Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult(); + assert mergedRows != null && rowsToMerge != null; + SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java new file mode 100644 index 0000000..af5b18e --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.DataSourceMetadata; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.SelectionOrderByOperator; +import org.apache.pinot.core.query.exception.EarlyTerminationException; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.core.util.trace.TraceRunnable; + + +/** + * Combine operator for selection order-by queries. + * <p>When the first order-by expression is an identifier (column), skip processing the segments if possible based on + * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement. + * <ul> + * <li>1. Sort all the segments by the column min/max value</li> + * <li>2. Keep processing segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li> + * <li>3. Skip processing the segments that cannot add values to the final result</li> + * </ul> + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SelectionOrderByCombineOperator extends BaseCombineOperator { + private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator"; + + // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this + // special IntermediateResultsBlock into the BlockingQueue to awake the main thread + private static final IntermediateResultsBlock LAST_RESULTS_BLOCK = + new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), + Collections.emptyList()); + + private final int _numRowsToKeep; + + public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService, long timeOutMs) { + super(operators, queryContext, executorService, timeOutMs); + _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions(); + assert orderByExpressions != null; + if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) { + return minMaxValueBasedCombine(); + } else { + return super.getNextBlock(); + } + } + + private IntermediateResultsBlock minMaxValueBasedCombine() { + long startTimeMs = System.currentTimeMillis(); + long endTimeMs = startTimeMs + _timeOutMs; + + List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions(); + assert orderByExpressions != null; + int numOrderByExpressions = orderByExpressions.size(); + assert numOrderByExpressions > 0; + OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0); + assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER; + String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier(); + boolean asc = firstOrderByExpression.isAsc(); + + int numOperators = _operators.size(); + List<MinMaxValueContext> minMaxValueContexts = new ArrayList<>(numOperators); + for (Operator operator : _operators) { + minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn)); + } + try { + if (asc) { + // For ascending order, sort on column min value in ascending order + minMaxValueContexts.sort((o1, o2) -> { + // Put segments without column min value in the front because we always need to process them + if (o1._minValue == null) { + return o2._minValue == null ? 0 : -1; + } + if (o2._minValue == null) { + return 1; + } + return o1._minValue.compareTo(o2._minValue); + }); + } else { + // For descending order, sort on column max value in descending order + minMaxValueContexts.sort((o1, o2) -> { + // Put segments without column max value in the front because we always need to process them + if (o1._maxValue == null) { + return o2._maxValue == null ? 0 : -1; + } + if (o2._maxValue == null) { + return 1; + } + return o2._maxValue.compareTo(o1._maxValue); + }); + } + } catch (Exception e) { + // Fall back to the default combine (process all segments) when segments have different data types for the first + // order-by column + LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine", + firstOrderByColumn); + return super.getNextBlock(); + } + + int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators); + AtomicReference<Comparable> globalBoundaryValue = new AtomicReference<>(); + + // Use a BlockingQueue to store the per-segment result + BlockingQueue<IntermediateResultsBlock> blockingQueue = new ArrayBlockingQueue<>(numOperators); + // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue) + AtomicInteger numOperatorsSkipped = new AtomicInteger(); + // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread + // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is + // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined + // behavior (even JVM crash) when processing queries against it. + Phaser phaser = new Phaser(1); + + Future[] futures = new Future[numThreads]; + for (int i = 0; i < numThreads; i++) { + int threadIndex = i; + futures[i] = _executorService.submit(new TraceRunnable() { + @Override + public void runJob() { + try { + // Register the thread to the phaser + // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that + // means the query execution has finished, and the main thread has deregistered itself and returned + // the result. Directly return as no execution result will be taken. + if (phaser.register() < 0) { + return; + } + + // Keep a boundary value for the thread + // NOTE: The thread boundary value can be different from the global boundary value because thread boundary + // value is updated after processing the segment, while global boundary value is updated after the + // segment result is merged. + Comparable threadBoundaryValue = null; + + for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) { + // Calculate the boundary value from global boundary and thread boundary + Comparable boundaryValue = globalBoundaryValue.get(); + if (boundaryValue == null) { + boundaryValue = threadBoundaryValue; + } else { + if (threadBoundaryValue != null) { + if (asc) { + if (threadBoundaryValue.compareTo(boundaryValue) < 0) { + boundaryValue = threadBoundaryValue; + } + } else { + if (threadBoundaryValue.compareTo(boundaryValue) > 0) { + boundaryValue = threadBoundaryValue; + } + } + } + } + + // Check if the segment can be skipped + MinMaxValueContext minMaxValueContext = minMaxValueContexts.get(operatorIndex); + if (boundaryValue != null) { + if (asc) { + // For ascending order, no need to process more segments if the column min value is larger than the + // boundary value, or is equal to the boundary value and the there is only one order-by expression + if (minMaxValueContext._minValue != null) { + int result = minMaxValueContext._minValue.compareTo(boundaryValue); + if (result > 0 || (result == 0 && numOrderByExpressions == 1)) { + numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads); + blockingQueue.offer(LAST_RESULTS_BLOCK); + return; + } + } + } else { + // For descending order, no need to process more segments if the column max value is smaller than the + // boundary value, or is equal to the boundary value and the there is only one order-by expression + if (minMaxValueContext._maxValue != null) { + int result = minMaxValueContext._maxValue.compareTo(boundaryValue); + if (result < 0 || (result == 0 && numOrderByExpressions == 1)) { + numOperatorsSkipped.getAndAdd((numOperators - operatorIndex - 1) / numThreads); + blockingQueue.offer(LAST_RESULTS_BLOCK); + return; + } + } + } + } + + // Process the segment + try { + IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock(); + PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult(); + if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { + // Segment result has enough rows, update the boundary value + assert selectionResult.peek() != null; + Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0]; + if (boundaryValue == null) { + boundaryValue = segmentBoundaryValue; + } else { + if (asc) { + if (segmentBoundaryValue.compareTo(boundaryValue) < 0) { + boundaryValue = segmentBoundaryValue; + } + } else { + if (segmentBoundaryValue.compareTo(boundaryValue) > 0) { + boundaryValue = segmentBoundaryValue; + } + } + } + } + threadBoundaryValue = boundaryValue; + blockingQueue.offer(resultsBlock); + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, + _queryContext, e); + blockingQueue.offer(new IntermediateResultsBlock(e)); + return; + } + } + } finally { + phaser.arriveAndDeregister(); + } + } + }); + } + + IntermediateResultsBlock mergedBlock = null; + try { + int numBlocksMerged = 0; + while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) { + IntermediateResultsBlock blockToMerge = + blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (blockToMerge == null) { + // Query times out, skip merging the remaining results blocks + LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, + _queryContext); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); + break; + } + if (blockToMerge.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip merging the remaining results blocks and directly return + // the exception + mergedBlock = blockToMerge; + break; + } + if (mergedBlock == null) { + mergedBlock = blockToMerge; + } else { + if (blockToMerge != LAST_RESULTS_BLOCK) { + mergeResultsBlocks(mergedBlock, blockToMerge); + } + } + numBlocksMerged++; + + // Update the boundary value if enough rows are collected + PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult(); + if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { + assert selectionResult.peek() != null; + globalBoundaryValue.set((Comparable) selectionResult.peek()[0]); + } + } + } catch (Exception e) { + LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + } finally { + // Cancel all ongoing jobs + for (Future future : futures) { + if (!future.isDone()) { + future.cancel(true); + } + } + // Deregister the main thread and wait for all threads done + phaser.awaitAdvance(phaser.arriveAndDeregister()); + } + + CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators); + return mergedBlock; + } + + private static class MinMaxValueContext { + final SelectionOrderByOperator _operator; + final Comparable _minValue; + final Comparable _maxValue; + + MinMaxValueContext(SelectionOrderByOperator operator, String column) { + _operator = operator; + DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata(); + _minValue = dataSourceMetadata.getMinValue(); + _maxValue = dataSourceMetadata.getMaxValue(); + } + } + + @Override + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { + DataSchema mergedDataSchema = mergedBlock.getDataSchema(); + DataSchema dataSchemaToMerge = blockToMerge.getDataSchema(); + assert mergedDataSchema != null && dataSchemaToMerge != null; + if (!mergedDataSchema.equals(dataSchemaToMerge)) { + String errorMessage = String + .format("Data schema mismatch between merged block: %s and block to merge: %s, drop block to merge", + mergedDataSchema, dataSchemaToMerge); + // NOTE: This is segment level log, so log at debug level to prevent flooding the log. + LOGGER.debug(errorMessage); + mergedBlock + .addToProcessingExceptions(QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); + return; + } + + PriorityQueue<Object[]> mergedRows = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult(); + Collection<Object[]> rowsToMerge = blockToMerge.getSelectionResult(); + assert mergedRows != null && rowsToMerge != null; + SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index e13ef48..637672c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -166,6 +166,10 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl }; } + public IndexSegment getIndexSegment() { + return _indexSegment; + } + @Override protected IntermediateResultsBlock getNextBlock() { if (_expressions.size() == _orderByExpressions.size()) { @@ -183,6 +187,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl // Fetch all the expressions and insert them into the priority queue BlockValSet[] blockValSets = new BlockValSet[numExpressions]; + int numColumnsProjected = _transformOperator.getNumColumnsProjected(); TransformBlock transformBlock; while ((transformBlock = _transformOperator.nextBlock()) != null) { for (int i = 0; i < numExpressions; i++) { @@ -191,12 +196,12 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl } RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets); int numDocsFetched = transformBlock.getNumDocs(); - _numDocsScanned += numDocsFetched; for (int i = 0; i < numDocsFetched; i++) { SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep); } + _numDocsScanned += numDocsFetched; + _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected; } - _numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected(); // Create the data schema String[] columnNames = new String[numExpressions]; @@ -221,6 +226,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl // Fetch the order-by expressions and docIds and insert them into the priority queue BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions + 1]; + int numColumnsProjected = _transformOperator.getNumColumnsProjected(); TransformBlock transformBlock; while ((transformBlock = _transformOperator.nextBlock()) != null) { for (int i = 0; i < numOrderByExpressions; i++) { @@ -230,7 +236,6 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl blockValSets[numOrderByExpressions] = transformBlock.getBlockValueSet(BuiltInVirtualColumn.DOCID); RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets); int numDocsFetched = transformBlock.getNumDocs(); - _numDocsScanned += numDocsFetched; for (int i = 0; i < numDocsFetched; i++) { // NOTE: We pre-allocate the complete row so that we can fill up the non-order-by output expression values later // without creating extra rows or re-constructing the priority queue. We can change the values in-place @@ -239,6 +244,8 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl blockValueFetcher.getRow(i, row, 0); SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep); } + _numDocsScanned += numDocsFetched; + _numEntriesScannedPostFilter += numDocsFetched * numColumnsProjected; } // Copy the rows (shallow copy so that any modification will also be reflected to the priority queue) into a list, @@ -261,6 +268,7 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl for (ExpressionContext expressionContext : nonOrderByExpressions) { expressionContext.getColumns(columns); } + int numColumns = columns.size(); Map<String, DataSource> dataSourceMap = new HashMap<>(); for (String column : columns) { dataSourceMap.put(column, _indexSegment.getDataSource(column)); @@ -283,11 +291,9 @@ public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBl for (int i = 0; i < numDocsFetched; i++) { blockValueFetcher.getRow(i, rowList.get(rowBaseId + i), numOrderByExpressions); } + _numEntriesScannedPostFilter += numDocsFetched * numColumns; rowBaseId += numDocsFetched; } - _numEntriesScannedPostFilter = - (long) _numDocsScanned * _transformOperator.getNumColumnsProjected() + (long) numRows * transformOperator - .getNumColumnsProjected(); // Create the data schema String[] columnNames = new String[numExpressions]; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index 71a40b3..4f01fea 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -26,10 +26,12 @@ import java.util.concurrent.Future; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.CombineGroupByOperator; -import org.apache.pinot.core.operator.CombineGroupByOrderByOperator; -import org.apache.pinot.core.operator.CombineOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator; +import org.apache.pinot.core.operator.combine.GroupByCombineOperator; +import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator; +import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator; +import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator; import org.apache.pinot.core.query.exception.BadQueryRequestException; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; @@ -158,18 +160,26 @@ public class CombinePlanNode implements PlanNode { } } - // TODO: use the same combine operator for both aggregation and selection query. - if (QueryContextUtils.isAggregationQuery(_queryContext) && _queryContext.getGroupByExpressions() != null) { - // Aggregation group-by query - QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions()); - // new Combine operator only when GROUP_BY_MODE explicitly set to SQL - if (queryOptions.isGroupByModeSQL()) { - return new CombineGroupByOrderByOperator(operators, _queryContext, _executorService, _timeOutMs); + if (QueryContextUtils.isAggregationQuery(_queryContext)) { + if (_queryContext.getGroupByExpressions() == null) { + // Aggregation only + return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + } else { + // Aggregation group-by + QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions()); + if (queryOptions.isGroupByModeSQL()) { + return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + } + return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit); } - return new CombineGroupByOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit); } else { - // Selection or aggregation only query - return new CombineOperator(operators, _queryContext, _executorService, _timeOutMs); + if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) { + // Selection only + return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + } else { + // Selection order-by + return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs); + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java deleted file mode 100644 index 80cf40e..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.query.reduce; - -import java.util.Collection; -import java.util.List; -import java.util.PriorityQueue; -import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.response.ProcessingException; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; -import org.apache.pinot.core.query.selection.SelectionOperatorUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The <code>CombineService</code> class provides the utility methods to combine {@link IntermediateResultsBlock}s. - */ -@SuppressWarnings({"ConstantConditions", "rawtypes", "unchecked"}) -public class CombineService { - private CombineService() { - } - - private static final Logger LOGGER = LoggerFactory.getLogger(CombineService.class); - - public static void mergeTwoBlocks(QueryContext queryContext, IntermediateResultsBlock mergedBlock, - IntermediateResultsBlock blockToMerge) { - // Combine processing exceptions. - List<ProcessingException> mergedProcessingExceptions = mergedBlock.getProcessingExceptions(); - List<ProcessingException> processingExceptionsToMerge = blockToMerge.getProcessingExceptions(); - if (mergedProcessingExceptions == null) { - mergedBlock.setProcessingExceptions(processingExceptionsToMerge); - } else if (processingExceptionsToMerge != null) { - mergedProcessingExceptions.addAll(processingExceptionsToMerge); - } - - // Combine result. - if (QueryContextUtils.isAggregationQuery(queryContext)) { - // NOTE: Aggregation group-by queries should not reach here. - assert queryContext.getGroupByExpressions() == null; - - // Combine aggregation-only result. - // Might be null if caught exception during query execution. - List<Object> aggregationResultToMerge = blockToMerge.getAggregationResult(); - if (aggregationResultToMerge == null) { - // No data in block to merge. - return; - } - - AggregationFunction[] mergedAggregationFunctions = mergedBlock.getAggregationFunctions(); - if (mergedAggregationFunctions == null) { - // No data in merged block. - mergedBlock.setAggregationFunctions(blockToMerge.getAggregationFunctions()); - mergedBlock.setAggregationResults(aggregationResultToMerge); - } else { - // Merge two blocks. - List<Object> mergedAggregationResult = mergedBlock.getAggregationResult(); - int numAggregationFunctions = mergedAggregationFunctions.length; - for (int i = 0; i < numAggregationFunctions; i++) { - mergedAggregationResult.set(i, - mergedAggregationFunctions[i].merge(mergedAggregationResult.get(i), aggregationResultToMerge.get(i))); - } - } - } else { - // Combine selection result. - - // Data schema will be null if exceptions caught during query processing. - // Result set size will be zero if no row matches the predicate. - DataSchema mergedBlockSchema = mergedBlock.getDataSchema(); - DataSchema blockToMergeSchema = blockToMerge.getDataSchema(); - Collection<Object[]> mergedBlockResultSet = mergedBlock.getSelectionResult(); - Collection<Object[]> blockToMergeResultSet = blockToMerge.getSelectionResult(); - - if (mergedBlockSchema == null || mergedBlockResultSet.size() == 0) { - // No data in merged block. - - // If block to merge schema is not null, set its data schema and result to the merged block. - if (blockToMergeSchema != null) { - mergedBlock.setDataSchema(blockToMergeSchema); - mergedBlock.setSelectionResult(blockToMergeResultSet); - } - } else { - // Some data in merged block. - - boolean isSelectionOrderBy = queryContext.getOrderByExpressions() != null; - int limit = queryContext.getLimit(); - - // No need to merge if already got enough rows for selection only. - if (!isSelectionOrderBy && mergedBlockResultSet.size() == limit) { - return; - } - - // Merge only if there are data in block to merge. - if (blockToMergeSchema != null && blockToMergeResultSet.size() > 0) { - if (mergedBlockSchema.isTypeCompatibleWith(blockToMergeSchema)) { - // Two blocks are mergeable. - - // Upgrade the merged block schema if necessary. - mergedBlockSchema.upgradeToCover(blockToMergeSchema); - - // Merge two blocks. - if (isSelectionOrderBy) { - // Combine selection order-by. - SelectionOperatorUtils - .mergeWithOrdering((PriorityQueue<Object[]>) mergedBlockResultSet, blockToMergeResultSet, - queryContext.getOffset() + limit); - } else { - // Combine selection only. - SelectionOperatorUtils.mergeWithoutOrdering(mergedBlockResultSet, blockToMergeResultSet, limit); - } - mergedBlock.setSelectionResult(mergedBlockResultSet); - } else { - // Two blocks are not mergeable. - - String errorMessage = "Data schema inconsistency between merged block schema: " + mergedBlockSchema - + " and block to merge schema: " + blockToMergeSchema + ", drop block to merge"; - LOGGER.info(errorMessage); - mergedBlock.addToProcessingExceptions( - QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage)); - } - } - } - } - } -} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java similarity index 79% rename from pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java index 3a670fe..bfbb6c4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/CombineSlowOperatorsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineSlowOperatorsTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.operator; +package org.apache.pinot.core.operator.combine; import java.util.ArrayList; import java.util.List; @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.query.exception.EarlyTerminationException; @@ -60,30 +62,40 @@ public class CombineSlowOperatorsTest { } @Test - public void testCombineOperator() { + public void testSelectionOnlyCombineOperator() { List<Operator> operators = getOperators(); - CombineOperator combineOperator = - new CombineOperator(operators, QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), - _executorService, TIMEOUT_MS); + SelectionOnlyCombineOperator combineOperator = new SelectionOnlyCombineOperator(operators, + QueryContextConverterUtils.getQueryContextFromPQL("SELECT * FROM table"), _executorService, TIMEOUT_MS); + testCombineOperator(operators, combineOperator); + } + + // NOTE: Skip the test for SelectionOrderByCombineOperator because it requires SelectionOrderByOperator for the early + // termination optimization. + + @Test + public void testAggregationOnlyCombineOperator() { + List<Operator> operators = getOperators(); + AggregationOnlyCombineOperator combineOperator = new AggregationOnlyCombineOperator(operators, + QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table"), _executorService, TIMEOUT_MS); testCombineOperator(operators, combineOperator); } @Test - public void testCombineGroupByOperator() { + public void testGroupByCombineOperator() { List<Operator> operators = getOperators(); - CombineGroupByOperator combineGroupByOperator = new CombineGroupByOperator(operators, + GroupByCombineOperator combineOperator = new GroupByCombineOperator(operators, QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"), _executorService, TIMEOUT_MS, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); - testCombineOperator(operators, combineGroupByOperator); + testCombineOperator(operators, combineOperator); } @Test - public void testCombineGroupByOrderByOperator() { + public void testGroupByOrderByCombineOperator() { List<Operator> operators = getOperators(); - CombineGroupByOrderByOperator combineGroupByOrderByOperator = new CombineGroupByOrderByOperator(operators, + GroupByOrderByCombineOperator combineOperator = new GroupByOrderByCombineOperator(operators, QueryContextConverterUtils.getQueryContextFromPQL("SELECT COUNT(*) FROM table GROUP BY column"), _executorService, TIMEOUT_MS); - testCombineOperator(operators, combineGroupByOrderByOperator); + testCombineOperator(operators, combineOperator); } /** @@ -151,7 +163,7 @@ public class CombineSlowOperatorsTest { @Override public ExecutionStatistics getExecutionStatistics() { - return new ExecutionStatistics(); + return new ExecutionStatistics(0, 0, 0, 0); } } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java new file mode 100644 index 0000000..ac4ecf5 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.data.readers.GenericRowRecordReader; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.plan.CombinePlanNode; +import org.apache.pinot.core.plan.PlanNode; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Test for {@link SelectionOnlyCombineOperator} and {@link SelectionOrderByCombineOperator}. + */ +public class SelectionCombineOperatorTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SelectionCombineEarlyTerminationTest"); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME_PREFIX = "testSegment_"; + + // Create (MAX_NUM_THREADS_PER_QUERY * 2) segments so that each thread needs to process 2 segments + private static final int NUM_SEGMENTS = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 2; + private static final int NUM_RECORDS_PER_SEGMENT = 100; + + private static final String INT_COLUMN = "intColumn"; + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + private static final Schema SCHEMA = + new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build(); + + private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2(); + private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); + + private List<IndexSegment> _indexSegments; + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + _indexSegments = new ArrayList<>(NUM_SEGMENTS); + for (int i = 0; i < NUM_SEGMENTS; i++) { + _indexSegments.add(createSegment(i)); + } + } + + private IndexSegment createSegment(int index) + throws Exception { + int baseValue = index * NUM_RECORDS_PER_SEGMENT / 2; + List<GenericRow> records = new ArrayList<>(NUM_RECORDS_PER_SEGMENT); + for (int i = 0; i < NUM_RECORDS_PER_SEGMENT; i++) { + GenericRow record = new GenericRow(); + record.putValue(INT_COLUMN, baseValue + i); + records.add(record); + } + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + String segmentName = SEGMENT_NAME_PREFIX + index; + segmentGeneratorConfig.setSegmentName(segmentName); + segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); + driver.build(); + + return ImmutableSegmentLoader.load(new File(TEMP_DIR, segmentName), ReadMode.mmap); + } + + @Test + public void testSelectionLimit0() { + IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable LIMIT 0"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + assertNotNull(combineResult.getSelectionResult()); + assertTrue(combineResult.getSelectionResult().isEmpty()); + assertEquals(combineResult.getNumDocsScanned(), 0); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), 0); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + assertEquals(combineResult.getNumSegmentsMatched(), 0); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + } + + @Test + public void testSelectionOnly() { + IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + assertNotNull(combineResult.getSelectionResult()); + assertEquals(combineResult.getSelectionResult().size(), 10); + // Should early-terminate after processing the result of the first segment. Each thread should process at most 1 + // segment. + long numDocsScanned = combineResult.getNumDocsScanned(); + assertTrue(numDocsScanned >= 10 && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * 10); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + int numSegmentsMatched = combineResult.getNumSegmentsMatched(); + assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + + combineResult = getCombineResult("SELECT * FROM testTable LIMIT 10000"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + assertNotNull(combineResult.getSelectionResult()); + assertEquals(combineResult.getSelectionResult().size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + // Should not early-terminate + numDocsScanned = combineResult.getNumDocsScanned(); + assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + } + + @Test + public void testSelectionOrderBy() { + IntermediateResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult(); + assertNotNull(selectionResult); + assertEquals(selectionResult.size(), 10); + int expectedValue = 9; + while (!selectionResult.isEmpty()) { + assertEquals((int) selectionResult.poll()[0], expectedValue--); + } + // Should early-terminate after processing the result of the first segment. Each thread should process at most 1 + // segment. + long numDocsScanned = combineResult.getNumDocsScanned(); + assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT + && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + int numSegmentsMatched = combineResult.getNumSegmentsMatched(); + assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + + combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult(); + assertNotNull(selectionResult); + assertEquals(selectionResult.size(), 10); + expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40; + while (!selectionResult.isEmpty()) { + assertEquals((int) selectionResult.poll()[0], expectedValue++); + } + // Should early-terminate after processing the result of the first segment. Each thread should process at most 1 + // segment. + numDocsScanned = combineResult.getNumDocsScanned(); + assertTrue(numDocsScanned >= NUM_RECORDS_PER_SEGMENT + && numDocsScanned <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY * NUM_RECORDS_PER_SEGMENT); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + numSegmentsMatched = combineResult.getNumSegmentsMatched(); + assertTrue(numSegmentsMatched >= 1 && numSegmentsMatched <= CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + + combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000"); + assertEquals(combineResult.getDataSchema(), + new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT})); + selectionResult = (PriorityQueue<Object[]>) combineResult.getSelectionResult(); + assertNotNull(selectionResult); + assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + // Should not early-terminate + numDocsScanned = combineResult.getNumDocsScanned(); + assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + assertEquals(combineResult.getNumEntriesScannedInFilter(), 0); + assertEquals(combineResult.getNumEntriesScannedPostFilter(), numDocsScanned); + assertEquals(combineResult.getNumSegmentsProcessed(), NUM_SEGMENTS); + assertEquals(combineResult.getNumSegmentsMatched(), NUM_SEGMENTS); + assertEquals(combineResult.getNumTotalDocs(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT); + } + + private IntermediateResultsBlock getCombineResult(String query) { + QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query); + List<PlanNode> planNodes = new ArrayList<>(NUM_SEGMENTS); + for (IndexSegment indexSegment : _indexSegments) { + planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext)); + } + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + return combinePlanNode.run().nextBlock(); + } + + @AfterClass + public void tearDown() + throws IOException { + for (IndexSegment indexSegment : _indexSegments) { + indexSegment.destroy(); + } + FileUtils.deleteDirectory(TEMP_DIR); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java index 4ec7fbc..ff32e25 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java @@ -20,7 +20,6 @@ package org.apache.pinot.queries; import java.io.File; import java.net.URL; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -68,7 +67,6 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest { private static final String AVRO_DATA = "data" + File.separator + "test_data-sv.avro"; private static final String SEGMENT_NAME = "testTable_126164076_167572854"; private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "SingleValueQueriesTest"); - private static final int NUM_SEGMENTS = 2; // Hard-coded query filter. private static final String QUERY_FILTER = @@ -127,15 +125,7 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest { throws Exception { ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap); _indexSegment = immutableSegment; - int numSegments = getNumSegments(); - _indexSegments = new ArrayList<>(numSegments); - for (int i = 0; i < numSegments; i++) { - _indexSegments.add(immutableSegment); - } - } - - protected int getNumSegments() { - return NUM_SEGMENTS; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); } @AfterClass diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java deleted file mode 100644 index bca78a0..0000000 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.queries; - -import org.apache.pinot.common.response.broker.BrokerResponseNative; -import org.apache.pinot.core.operator.CombineOperator; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - - -/** - * Early termination test for selection-only queries. - */ -public class SelectionOnlyEarlyTerminationTest extends BaseSingleValueQueriesTest { - private static final int NUM_DOCS_PER_SEGMENT = 30000; - private static final int NUM_SERVERS = 2; - - /** - * In order to ensure each thread is executing more than 1 segment, this test is against - * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server. - */ - @Override - protected int getNumSegments() { - return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2; - } - - /** - * With early termination, selection-only query is scheduled with {@link CombineOperator#MAX_NUM_THREADS_PER_QUERY} - * threads per server, and the total number of segments matched (segments with non-zero documents scanned) should be - * the same as the total number of threads for each server. - */ - @Test - public void testSelectOnlyQuery() { - int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY; - int numSegmentsPerServer = getNumSegments(); - - // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480 - for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) { - String query = String.format("SELECT column1, column7, column9, column6 FROM testTable LIMIT %d", limit); - int numColumnsInSelection = 4; - BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); - assertNotNull(brokerResponse.getSelectionResults()); - assertNull(brokerResponse.getResultTable()); - assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS); - // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second - // phase merge early terminates, the operators might not finish scanning the documents - long numSegmentsMatched = brokerResponse.getNumSegmentsMatched(); - assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS); - long numDocsScanned = brokerResponse.getNumDocsScanned(); - assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection); - // Total number of documents should not be affected by early-termination - assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - - brokerResponse = getBrokerResponseForSqlQuery(query); - assertNull(brokerResponse.getSelectionResults()); - assertNotNull(brokerResponse.getResultTable()); - assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS); - // NOTE: 'numSegmentsMatched' and 'numDocsScanned' could be in a range because when the CombineOperator second - // phase merge early terminates, the operators might not finish scanning the documents - numSegmentsMatched = brokerResponse.getNumSegmentsMatched(); - assertTrue(numSegmentsMatched >= NUM_SERVERS && numSegmentsMatched <= numThreadsPerServer * NUM_SERVERS); - numDocsScanned = brokerResponse.getNumDocsScanned(); - assertTrue(numDocsScanned >= NUM_SERVERS * limit && numDocsScanned <= numThreadsPerServer * NUM_SERVERS * limit); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), numDocsScanned * numColumnsInSelection); - // Total number of documents should not be affected by early-termination - assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - } - } - - /** - * Without early termination, selection order-by query should hit all segments. - */ - @Test - public void testSelectWithOrderByQuery() { - int numSegmentsPerServer = getNumSegments(); - String query = "SELECT column11, column18, column1 FROM testTable ORDER BY column11"; - BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); - assertNotNull(brokerResponse.getSelectionResults()); - assertNull(brokerResponse.getResultTable()); - assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), - brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - - brokerResponse = getBrokerResponseForSqlQuery(query); - assertNull(brokerResponse.getSelectionResults()); - assertNotNull(brokerResponse.getResultTable()); - assertEquals(brokerResponse.getNumSegmentsProcessed(), numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getNumSegmentsMatched(), numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getNumDocsScanned(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); - // numDocsScanned * (1 order-by columns + 1 docId column) + 10 * (2 non-order-by columns) per segment - assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), - brokerResponse.getNumDocsScanned() * 2 + 20 * numSegmentsPerServer * NUM_SERVERS); - assertEquals(brokerResponse.getTotalDocs(), numSegmentsPerServer * NUM_SERVERS * NUM_DOCS_PER_SEGMENT); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org