This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 27b61fe6 Extract more common logic in combine operators (#6696) 27b61fe6 is described below commit 27b61fe6a338b1363efb64a7fed87d95cc793f8a Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Mar 22 10:34:57 2021 -0700 Extract more common logic in combine operators (#6696) Extracts the phaser logic to the `BaseCombineOperator.getNextBlock()` to reduce the duplicate code. Extends `StreamingSelectionOnlyCombineOperator` from `BaseCombineOperator` --- .../core/operator/combine/BaseCombineOperator.java | 197 +++++++------- .../operator/combine/GroupByCombineOperator.java | 102 +++---- .../combine/GroupByOrderByCombineOperator.java | 87 ++---- ...xValueBasedSelectionOrderByCombineOperator.java | 292 +++++++++++---------- .../core/operator/combine/MinMaxValueContext.java | 36 --- .../combine/SelectionOrderByCombineOperator.java | 72 +---- .../StreamingSelectionOnlyCombineOperator.java | 176 +++++-------- 7 files changed, 389 insertions(+), 573 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 417f449..f57efb3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -19,10 +19,10 @@ package org.apache.pinot.core.operator.combine; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,42 +50,41 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class); protected final List<Operator> _operators; + protected final int _numOperators; protected final QueryContext _queryContext; protected final ExecutorService _executorService; protected final long _endTimeMs; - protected final int _numOperators; - // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread - // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is - // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined - // behavior (even JVM crash) when processing queries against it. - protected final Phaser _phaser = new Phaser(1); - // Use a _blockingQueue to store the per-segment result - protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue; - private final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0); - protected int _numThreads; - protected Future[] _futures; - - public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, - long endTimeMs) { + protected final int _numThreads; + protected final Future[] _futures; + // Use a _blockingQueue to store the intermediate results blocks + protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue = new LinkedBlockingQueue<>(); + protected final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0); + + protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, + long endTimeMs, int numThreads) { _operators = operators; + _numOperators = _operators.size(); _queryContext = queryContext; _executorService = executorService; _endTimeMs = endTimeMs; - _numOperators = _operators.size(); - _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators); - _blockingQueue = new ArrayBlockingQueue<>(_numOperators); + _numThreads = numThreads; _futures = new Future[_numThreads]; } - public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, - long endTimeMs, int numThreads) { - this(operators, queryContext, executorService, endTimeMs); - _numThreads = numThreads; - _futures = new Future[_numThreads]; + protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, + long endTimeMs) { + this(operators, queryContext, executorService, endTimeMs, + CombineOperatorUtils.getNumThreadsForQuery(operators.size())); } @Override protected IntermediateResultsBlock getNextBlock() { + // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread + // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is + // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined + // behavior (even JVM crash) when processing queries against it. + Phaser phaser = new Phaser(1); + for (int i = 0; i < _numThreads; i++) { int threadIndex = i; _futures[i] = _executorService.submit(new TraceRunnable() { @@ -94,13 +93,41 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul ThreadTimer executionThreadTimer = new ThreadTimer(); executionThreadTimer.start(); - processSegments(threadIndex); + // Register the thread to the phaser + // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that + // means the query execution has finished, and the main thread has deregistered itself and returned + // the result. Directly return as no execution result will be taken. + if (phaser.register() < 0) { + return; + } + try { + processSegments(threadIndex); + } finally { + phaser.arriveAndDeregister(); + } - totalWorkerThreadCpuTimeNs.addAndGet(executionThreadTimer.stopAndGetThreadTimeNs()); + totalWorkerThreadCpuTimeNs.getAndAdd(executionThreadTimer.stopAndGetThreadTimeNs()); } }); } - IntermediateResultsBlock mergedBlock = mergeResultsFromSegments(); + + IntermediateResultsBlock mergedBlock; + try { + mergedBlock = mergeResults(); + } catch (Exception e) { + LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + } finally { + // Cancel all ongoing jobs + for (Future future : _futures) { + if (!future.isDone()) { + future.cancel(true); + } + } + // Deregister the main thread and wait for all threads done + phaser.awaitAdvance(phaser.arriveAndDeregister()); + } + /* * TODO: setThreadTime logic can be put into CombineOperatorUtils.setExecutionStatistics(), * after we extends StreamingSelectionOnlyCombineOperator from BaseCombineOperator. @@ -111,92 +138,64 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul } /** - * processSegments will execute query on one or more segments in a single thread. + * Executes query on one or more segments in a worker thread. */ protected void processSegments(int threadIndex) { - try { - // Register the thread to the phaser - // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that - // means the query execution has finished, and the main thread has deregistered itself and returned - // the result. Directly return as no execution result will be taken. - if (_phaser.register() < 0) { - return; - } - - for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { - try { - IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); - if (isQuerySatisfied(resultsBlock)) { - // Query is satisfied, skip processing the remaining segments - _blockingQueue.offer(resultsBlock); - return; - } else { - _blockingQueue.offer(resultsBlock); - } - } catch (EarlyTerminationException e) { - // Early-terminated by interruption (canceled by the main thread) - return; - } catch (Exception e) { - // Caught exception, skip processing the remaining operators - LOGGER - .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext, - e); - _blockingQueue.offer(new IntermediateResultsBlock(e)); + for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { + try { + IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); + if (isQuerySatisfied(resultsBlock)) { + // Query is satisfied, skip processing the remaining segments + _blockingQueue.offer(resultsBlock); return; + } else { + _blockingQueue.offer(resultsBlock); } + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext, + e); + _blockingQueue.offer(new IntermediateResultsBlock(e)); + return; } - } finally { - _phaser.arriveAndDeregister(); } } /** - * mergeResultsFromSegments will merge multiple intermediate result blocks into a result block. + * Merges the results from the worker threads into a results block. */ - protected IntermediateResultsBlock mergeResultsFromSegments() { + protected IntermediateResultsBlock mergeResults() + throws Exception { IntermediateResultsBlock mergedBlock = null; - try { - int numBlocksMerged = 0; - while (numBlocksMerged < _numOperators) { - IntermediateResultsBlock blockToMerge = - _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (blockToMerge == null) { - // Query times out, skip merging the remaining results blocks - LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, - _queryContext); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, - new TimeoutException("Timed out while polling results block"))); - break; - } - if (blockToMerge.getProcessingExceptions() != null) { - // Caught exception while processing segment, skip merging the remaining results blocks and directly return - // the exception - mergedBlock = blockToMerge; - break; - } - if (mergedBlock == null) { - mergedBlock = blockToMerge; - } else { - mergeResultsBlocks(mergedBlock, blockToMerge); - } - numBlocksMerged++; - if (isQuerySatisfied(mergedBlock)) { - // Query is satisfied, skip merging the remaining results blocks - break; - } + int numBlocksMerged = 0; + while (numBlocksMerged < _numOperators) { + IntermediateResultsBlock blockToMerge = + _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (blockToMerge == null) { + // Query times out, skip merging the remaining results blocks + LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, + _queryContext); + return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); } - } catch (Exception e) { - LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } + if (blockToMerge.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip merging the remaining results blocks and directly return the + // exception + return blockToMerge; + } + if (mergedBlock == null) { + mergedBlock = blockToMerge; + } else { + mergeResultsBlocks(mergedBlock, blockToMerge); + } + numBlocksMerged++; + if (isQuerySatisfied(mergedBlock)) { + // Query is satisfied, skip merging the remaining results blocks + return mergedBlock; } - // Deregister the main thread and wait for all threads done - _phaser.awaitAdvance(_phaser.arriveAndDeregister()); } return mergedBlock; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index dadb23a..35395ef 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -49,7 +48,7 @@ import org.slf4j.LoggerFactory; * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the parallelism of the query instead of using * all threads */ -@SuppressWarnings("rawtypes") +@SuppressWarnings({"rawtypes", "unchecked"}) public class GroupByCombineOperator extends BaseCombineOperator { private static final Logger LOGGER = LoggerFactory.getLogger(GroupByCombineOperator.class); private static final String OPERATOR_NAME = "GroupByCombineOperator"; @@ -70,11 +69,6 @@ public class GroupByCombineOperator extends BaseCombineOperator { private final int _numAggregationFunctions; // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished // _futures (try to interrupt the execution if it already started). - // Besides the CountDownLatch, we also use a Phaser to ensure all the Futures are done (not scheduled, finished or - // interrupted) before the main thread returns. We need to ensure no execution left before the main thread returning - // because the main thread holds the reference to the segments, and if the segments are deleted/refreshed, the - // segments can be released after the main thread returns, which would lead to undefined behavior (even JVM crash) - // when executing queries against them. private final CountDownLatch _operatorLatch; public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, @@ -88,27 +82,20 @@ public class GroupByCombineOperator extends BaseCombineOperator { _aggregationFunctions = _queryContext.getAggregationFunctions(); assert _aggregationFunctions != null; _numAggregationFunctions = _aggregationFunctions.length; - int numOperators = _operators.size(); - _operatorLatch = new CountDownLatch(numOperators); + _operatorLatch = new CountDownLatch(_numOperators); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; } /** - * {@inheritDoc} - * - * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks into a - * map + * Executes query on one segment in a worker thread and merges the results into the results map. */ @Override protected void processSegments(int threadIndex) { try { - // Register the thread to the _phaser. - // If the _phaser is terminated (returning negative value) when trying to register the thread, that means the - // query execution has timed out, and the main thread has deregistered itself and returned the result. - // Directly return as no execution result will be taken. - if (_phaser.register() < 0) { - return; - } - IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock(); @@ -153,7 +140,6 @@ public class GroupByCombineOperator extends BaseCombineOperator { _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } finally { _operatorLatch.countDown(); - _phaser.arriveAndDeregister(); } } @@ -177,57 +163,39 @@ public class GroupByCombineOperator extends BaseCombineOperator { * </ul> */ @Override - protected IntermediateResultsBlock mergeResultsFromSegments() { - try { - long timeoutMs = _endTimeMs - System.currentTimeMillis(); - boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); - if (!opCompleted) { - // If this happens, the broker side should already timed out, just log the error and return - String errorMessage = String - .format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs, - _queryContext); - LOGGER.error(errorMessage); - return new IntermediateResultsBlock(new TimeoutException(errorMessage)); - } - - // Trim the results map. - AggregationGroupByTrimmingService aggregationGroupByTrimmingService = - new AggregationGroupByTrimmingService(_queryContext); - List<Map<String, Object>> trimmedResults = - aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap); - IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true); + protected IntermediateResultsBlock mergeResults() + throws Exception { + long timeoutMs = _endTimeMs - System.currentTimeMillis(); + boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (!opCompleted) { + // If this happens, the broker side should already timed out, just log the error and return + String errorMessage = String + .format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs, _queryContext); + LOGGER.error(errorMessage); + return new IntermediateResultsBlock(new TimeoutException(errorMessage)); + } - // Set the processing exceptions. - if (!_mergedProcessingExceptions.isEmpty()) { - mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); - } - // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we - // are comparing number of groups across segments with the groups limit for each segment. - if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) { - mergedBlock.setNumGroupsLimitReached(true); - } + // Trim the results map. + AggregationGroupByTrimmingService aggregationGroupByTrimmingService = + new AggregationGroupByTrimmingService(_queryContext); + List<Map<String, Object>> trimmedResults = + aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap); + IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true); - return mergedBlock; - } catch (Exception e) { - return new IntermediateResultsBlock(e); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - // Deregister the main thread and wait for all threads done - _phaser.awaitAdvance(_phaser.arriveAndDeregister()); + // Set the processing exceptions. + if (!_mergedProcessingExceptions.isEmpty()) { + mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); + } + // TODO: this value should be set in the inner-segment operators. Setting it here might cause false positive as we + // are comparing number of groups across segments with the groups limit for each segment. + if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) { + mergedBlock.setNumGroupsLimitReached(true); } - } - @Override - protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { + return mergedBlock; } @Override - public String getOperatorName() { - return OPERATOR_NAME; + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 38f1a54..4cd518d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; @@ -68,11 +67,6 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { private final ConcurrentLinkedQueue<ProcessingException> _mergedProcessingExceptions = new ConcurrentLinkedQueue<>(); // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished // _futures (try to interrupt the execution if it already started). - // Besides the CountDownLatch, we also use a Phaser to ensure all the Futures are done (not scheduled, finished or - // interrupted) before the main thread returns. We need to ensure no execution left before the main thread returning - // because the main thread holds the reference to the segments, and if the segments are deleted/refreshed, the - // segments can be released after the main thread returns, which would lead to undefined behavior (even JVM crash) - // when executing queries against them. private final CountDownLatch _operatorLatch; private DataSchema _dataSchema; private ConcurrentIndexedTable _indexedTable; @@ -95,23 +89,17 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { _operatorLatch = new CountDownLatch(numOperators); } + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + /** - * {@inheritDoc} - * - * <p> Execute query on one or more segments in a single thread, and store multiple intermediate result blocks - * into {@link org.apache.pinot.core.data.table.IndexedTable} + * Executes query on one segment in a worker thread and merges the results into the indexed table. */ @Override protected void processSegments(int threadIndex) { try { - // Register the thread to the _phaser. - // If the _phaser is terminated (returning negative value) when trying to register the thread, that means the - // query execution has timed out, and the main thread has deregistered itself and returned the result. - // Directly return as no execution result will be taken. - if (_phaser.register() < 0) { - return; - } - IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock(); @@ -164,7 +152,6 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } finally { _operatorLatch.countDown(); - _phaser.arriveAndDeregister(); } } @@ -182,52 +169,34 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { * </ul> */ @Override - protected IntermediateResultsBlock mergeResultsFromSegments() { - try { - long timeoutMs = _endTimeMs - System.currentTimeMillis(); - boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); - if (!opCompleted) { - // If this happens, the broker side should already timed out, just log the error and return - String errorMessage = String - .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, - _queryContext); - LOGGER.error(errorMessage); - return new IntermediateResultsBlock(new TimeoutException(errorMessage)); - } + protected IntermediateResultsBlock mergeResults() + throws Exception { + long timeoutMs = _endTimeMs - System.currentTimeMillis(); + boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (!opCompleted) { + // If this happens, the broker side should already timed out, just log the error and return + String errorMessage = String + .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs, + _queryContext); + LOGGER.error(errorMessage); + return new IntermediateResultsBlock(new TimeoutException(errorMessage)); + } - _indexedTable.finish(false); - IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_indexedTable); + _indexedTable.finish(false); + IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_indexedTable); - // Set the processing exceptions. - if (!_mergedProcessingExceptions.isEmpty()) { - mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); - } - - mergedBlock.setNumResizes(_indexedTable.getNumResizes()); - mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs()); - // TODO - set numGroupsLimitReached - return mergedBlock; - } catch (Exception e) { - return new IntermediateResultsBlock(e); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - // Deregister the main thread and wait for all threads done - _phaser.awaitAdvance(_phaser.arriveAndDeregister()); + // Set the processing exceptions. + if (!_mergedProcessingExceptions.isEmpty()) { + mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); } - } - - @Override - protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { + mergedBlock.setNumResizes(_indexedTable.getNumResizes()); + mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs()); + // TODO - set numGroupsLimitReached + return mergedBlock; } @Override - public String getOperatorName() { - return OPERATOR_NAME; + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index 29359f6..e7df81c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -18,20 +18,22 @@ */ package org.apache.pinot.core.operator.combine; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.DataSourceMetadata; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.SelectionOrderByOperator; import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; @@ -67,11 +69,48 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine private final int _numRowsToKeep; private final List<MinMaxValueContext> _minMaxValueContexts; - public MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, - ExecutorService executorService, long endTimeMs, List<MinMaxValueContext> minMaxValueContexts) { + MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService, long endTimeMs) { super(operators, queryContext, executorService, endTimeMs); - _minMaxValueContexts = minMaxValueContexts; _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); + + List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions(); + assert orderByExpressions != null; + int numOrderByExpressions = orderByExpressions.size(); + assert numOrderByExpressions > 0; + OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0); + assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER; + String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier(); + + _minMaxValueContexts = new ArrayList<>(_numOperators); + for (Operator operator : _operators) { + _minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn)); + } + if (firstOrderByExpression.isAsc()) { + // For ascending order, sort on column min value in ascending order + _minMaxValueContexts.sort((o1, o2) -> { + // Put segments without column min value in the front because we always need to process them + if (o1._minValue == null) { + return o2._minValue == null ? 0 : -1; + } + if (o2._minValue == null) { + return 1; + } + return o1._minValue.compareTo(o2._minValue); + }); + } else { + // For descending order, sort on column max value in descending order + _minMaxValueContexts.sort((o1, o2) -> { + // Put segments without column max value in the front because we always need to process them + if (o1._maxValue == null) { + return o2._maxValue == null ? 0 : -1; + } + if (o2._maxValue == null) { + return 1; + } + return o2._maxValue.compareTo(o1._maxValue); + }); + } } @Override @@ -95,106 +134,93 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER; boolean asc = firstOrderByExpression.isAsc(); - try { - // Register the thread to the _phaser - // NOTE: If the _phaser is terminated (returning negative value) when trying to register the thread, that - // means the query execution has finished, and the main thread has deregistered itself and returned - // the result. Directly return as no execution result will be taken. - if (_phaser.register() < 0) { - return; - } + // Keep a boundary value for the thread + // NOTE: The thread boundary value can be different from the global boundary value because thread boundary + // value is updated after processing the segment, while global boundary value is updated after the + // segment result is merged. + Comparable threadBoundaryValue = null; - // Keep a boundary value for the thread - // NOTE: The thread boundary value can be different from the global boundary value because thread boundary - // value is updated after processing the segment, while global boundary value is updated after the - // segment result is merged. - Comparable threadBoundaryValue = null; - - for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { - // Calculate the boundary value from global boundary and thread boundary - Comparable boundaryValue = _globalBoundaryValue.get(); - if (boundaryValue == null) { - boundaryValue = threadBoundaryValue; - } else { - if (threadBoundaryValue != null) { - if (asc) { - if (threadBoundaryValue.compareTo(boundaryValue) < 0) { - boundaryValue = threadBoundaryValue; - } - } else { - if (threadBoundaryValue.compareTo(boundaryValue) > 0) { - boundaryValue = threadBoundaryValue; - } + for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { + // Calculate the boundary value from global boundary and thread boundary + Comparable boundaryValue = _globalBoundaryValue.get(); + if (boundaryValue == null) { + boundaryValue = threadBoundaryValue; + } else { + if (threadBoundaryValue != null) { + if (asc) { + if (threadBoundaryValue.compareTo(boundaryValue) < 0) { + boundaryValue = threadBoundaryValue; + } + } else { + if (threadBoundaryValue.compareTo(boundaryValue) > 0) { + boundaryValue = threadBoundaryValue; } } } + } - // Check if the segment can be skipped - MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex); - if (boundaryValue != null) { - if (asc) { - // For ascending order, no need to process more segments if the column min value is larger than the - // boundary value, or is equal to the boundary value and the there is only one order-by expression - if (minMaxValueContext._minValue != null) { - int result = minMaxValueContext._minValue.compareTo(boundaryValue); - if (result > 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); - _blockingQueue.offer(LAST_RESULTS_BLOCK); - return; - } + // Check if the segment can be skipped + MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex); + if (boundaryValue != null) { + if (asc) { + // For ascending order, no need to process more segments if the column min value is larger than the + // boundary value, or is equal to the boundary value and the there is only one order-by expression + if (minMaxValueContext._minValue != null) { + int result = minMaxValueContext._minValue.compareTo(boundaryValue); + if (result > 0 || (result == 0 && numOrderByExpressions == 1)) { + _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); + _blockingQueue.offer(LAST_RESULTS_BLOCK); + return; } - } else { - // For descending order, no need to process more segments if the column max value is smaller than the - // boundary value, or is equal to the boundary value and the there is only one order-by expression - if (minMaxValueContext._maxValue != null) { - int result = minMaxValueContext._maxValue.compareTo(boundaryValue); - if (result < 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); - _blockingQueue.offer(LAST_RESULTS_BLOCK); - return; - } + } + } else { + // For descending order, no need to process more segments if the column max value is smaller than the + // boundary value, or is equal to the boundary value and the there is only one order-by expression + if (minMaxValueContext._maxValue != null) { + int result = minMaxValueContext._maxValue.compareTo(boundaryValue); + if (result < 0 || (result == 0 && numOrderByExpressions == 1)) { + _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numThreads); + _blockingQueue.offer(LAST_RESULTS_BLOCK); + return; } } } + } - // Process the segment - try { - IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock(); - PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult(); - if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { - // Segment result has enough rows, update the boundary value - assert selectionResult.peek() != null; - Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0]; - if (boundaryValue == null) { - boundaryValue = segmentBoundaryValue; + // Process the segment + try { + IntermediateResultsBlock resultsBlock = minMaxValueContext._operator.nextBlock(); + PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) resultsBlock.getSelectionResult(); + if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { + // Segment result has enough rows, update the boundary value + assert selectionResult.peek() != null; + Comparable segmentBoundaryValue = (Comparable) selectionResult.peek()[0]; + if (boundaryValue == null) { + boundaryValue = segmentBoundaryValue; + } else { + if (asc) { + if (segmentBoundaryValue.compareTo(boundaryValue) < 0) { + boundaryValue = segmentBoundaryValue; + } } else { - if (asc) { - if (segmentBoundaryValue.compareTo(boundaryValue) < 0) { - boundaryValue = segmentBoundaryValue; - } - } else { - if (segmentBoundaryValue.compareTo(boundaryValue) > 0) { - boundaryValue = segmentBoundaryValue; - } + if (segmentBoundaryValue.compareTo(boundaryValue) > 0) { + boundaryValue = segmentBoundaryValue; } } } - threadBoundaryValue = boundaryValue; - _blockingQueue.offer(resultsBlock); - } catch (EarlyTerminationException e) { - // Early-terminated by interruption (canceled by the main thread) - return; - } catch (Exception e) { - // Caught exception, skip processing the remaining operators - LOGGER - .error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext, - e); - _blockingQueue.offer(new IntermediateResultsBlock(e)); - return; } + threadBoundaryValue = boundaryValue; + _blockingQueue.offer(resultsBlock); + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext, + e); + _blockingQueue.offer(new IntermediateResultsBlock(e)); + return; } - } finally { - _phaser.arriveAndDeregister(); } } @@ -212,55 +238,42 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine * </ul> */ @Override - protected IntermediateResultsBlock mergeResultsFromSegments() { + protected IntermediateResultsBlock mergeResults() + throws Exception { IntermediateResultsBlock mergedBlock = null; - try { - int numBlocksMerged = 0; - while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) { - IntermediateResultsBlock blockToMerge = - _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (blockToMerge == null) { - // Query times out, skip merging the remaining results blocks - LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, - _queryContext); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, - new TimeoutException("Timed out while polling results block"))); - break; - } - if (blockToMerge.getProcessingExceptions() != null) { - // Caught exception while processing segment, skip merging the remaining results blocks and directly return - // the exception - mergedBlock = blockToMerge; - break; - } - if (mergedBlock == null) { - mergedBlock = blockToMerge; - } else { - if (blockToMerge != LAST_RESULTS_BLOCK) { - mergeResultsBlocks(mergedBlock, blockToMerge); - } - } - numBlocksMerged++; - - // Update the boundary value if enough rows are collected - PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult(); - if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { - assert selectionResult.peek() != null; - _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]); - } + int numBlocksMerged = 0; + while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) { + IntermediateResultsBlock blockToMerge = + _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (blockToMerge == null) { + // Query times out, skip merging the remaining results blocks + LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, + _queryContext); + mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); + break; } - } catch (Exception e) { - LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); - mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); + if (blockToMerge.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip merging the remaining results blocks and directly return + // the exception + mergedBlock = blockToMerge; + break; + } + if (mergedBlock == null) { + mergedBlock = blockToMerge; + } else { + if (blockToMerge != LAST_RESULTS_BLOCK) { + mergeResultsBlocks(mergedBlock, blockToMerge); } } - // Deregister the main thread and wait for all threads done - _phaser.awaitAdvance(_phaser.arriveAndDeregister()); + numBlocksMerged++; + + // Update the boundary value if enough rows are collected + PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) mergedBlock.getSelectionResult(); + if (selectionResult != null && selectionResult.size() == _numRowsToKeep) { + assert selectionResult.peek() != null; + _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]); + } } return mergedBlock; } @@ -286,4 +299,17 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine assert mergedRows != null && rowsToMerge != null; SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep); } + + private static class MinMaxValueContext { + final SelectionOrderByOperator _operator; + final Comparable _minValue; + final Comparable _maxValue; + + MinMaxValueContext(SelectionOrderByOperator operator, String column) { + _operator = operator; + DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata(); + _minValue = dataSourceMetadata.getMinValue(); + _maxValue = dataSourceMetadata.getMaxValue(); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java deleted file mode 100644 index 3f76058..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.operator.combine; - -import org.apache.pinot.core.common.DataSourceMetadata; -import org.apache.pinot.core.operator.query.SelectionOrderByOperator; - - -public class MinMaxValueContext { - final SelectionOrderByOperator _operator; - final Comparable _minValue; - final Comparable _maxValue; - - MinMaxValueContext(SelectionOrderByOperator operator, String column) { - _operator = operator; - DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata(); - _minValue = dataSourceMetadata.getMinValue(); - _maxValue = dataSourceMetadata.getMaxValue(); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java index dfd34e9..ef8a7d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.operator.combine; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.PriorityQueue; @@ -27,7 +26,6 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.operator.query.SelectionOrderByOperator; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.OrderByExpressionContext; import org.apache.pinot.core.query.request.context.QueryContext; @@ -43,24 +41,16 @@ import org.slf4j.LoggerFactory; * skip processing some segments based on the column min/max value. Otherwise fall back to the default combine * (process all segments). */ -@SuppressWarnings({"rawtypes", "unchecked"}) +@SuppressWarnings("rawtypes") public class SelectionOrderByCombineOperator extends BaseCombineOperator { private static final Logger LOGGER = LoggerFactory.getLogger(SelectionOrderByCombineOperator.class); private static final String OPERATOR_NAME = "SelectionOrderByCombineOperator"; - private final List<Operator> _operators; - private final QueryContext _queryContext; - private final ExecutorService _executorService; - private final long _endTimeMs; private final int _numRowsToKeep; public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long endTimeMs) { super(operators, queryContext, executorService, endTimeMs); - _operators = operators; - _queryContext = queryContext; - _executorService = executorService; - _endTimeMs = endTimeMs; _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); } @@ -83,62 +73,14 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator { List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions(); assert orderByExpressions != null; if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) { - return tryMinMaxValueBasedCombine(orderByExpressions); - } else { - // Fall back to the default combine (process all segments) when segments have different data types for the first - // order-by column - return super.getNextBlock(); - } - } - - private IntermediateResultsBlock tryMinMaxValueBasedCombine(List<OrderByExpressionContext> orderByExpressions) { - int numOrderByExpressions = orderByExpressions.size(); - assert numOrderByExpressions > 0; - OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0); - assert firstOrderByExpression.getExpression().getType() == ExpressionContext.Type.IDENTIFIER; - String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier(); - boolean asc = firstOrderByExpression.isAsc(); - - int numOperators = _operators.size(); - List<MinMaxValueContext> minMaxValueContexts = new ArrayList<>(numOperators); - for (Operator operator : _operators) { - minMaxValueContexts.add(new MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn)); - } - try { - if (asc) { - // For ascending order, sort on column min value in ascending order - minMaxValueContexts.sort((o1, o2) -> { - // Put segments without column min value in the front because we always need to process them - if (o1._minValue == null) { - return o2._minValue == null ? 0 : -1; - } - if (o2._minValue == null) { - return 1; - } - return o1._minValue.compareTo(o2._minValue); - }); - } else { - // For descending order, sort on column max value in descending order - minMaxValueContexts.sort((o1, o2) -> { - // Put segments without column max value in the front because we always need to process them - if (o1._maxValue == null) { - return o2._maxValue == null ? 0 : -1; - } - if (o2._maxValue == null) { - return 1; - } - return o2._maxValue.compareTo(o1._maxValue); - }); + try { + return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService, + _endTimeMs).getNextBlock(); + } catch (Exception e) { + LOGGER.warn("Caught exception while using min/max value based combine, using the default combine", e); } - } catch (Exception e) { - // Fall back to the default combine (process all segments) if there are any exceptions. - LOGGER.warn("Segments have different data types for the first order-by column: {}, using the default combine", - firstOrderByColumn); - return super.getNextBlock(); } - - return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService, _endTimeMs, - minMaxValueContexts).getNextBlock(); + return super.getNextBlock(); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java index e84e084..869044d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java @@ -22,35 +22,29 @@ import io.grpc.stub.StreamObserver; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.operator.combine.CombineOperatorUtils; +import org.apache.pinot.core.operator.combine.BaseCombineOperator; import org.apache.pinot.core.query.exception.EarlyTerminationException; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; -import org.apache.pinot.core.util.trace.TraceRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Combine operator for selection only streaming queries. - * TODO: extend StreamingSelectionOnlyCombineOperator from BaseCombineOperator. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class StreamingSelectionOnlyCombineOperator extends BaseOperator<IntermediateResultsBlock> { +public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator { private static final Logger LOGGER = LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class); private static final String OPERATOR_NAME = "StreamingSelectionOnlyCombineOperator"; @@ -59,19 +53,13 @@ public class StreamingSelectionOnlyCombineOperator extends BaseOperator<Intermed new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList()); - private final List<Operator> _operators; - private final QueryContext _queryContext; - private final ExecutorService _executorService; - private final long _endTimeMs; private final StreamObserver<Server.ServerResponse> _streamObserver; private final int _limit; + private final AtomicLong _numRowsCollected = new AtomicLong(); public StreamingSelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService, long endTimeMs, StreamObserver<Server.ServerResponse> streamObserver) { - _operators = operators; - _queryContext = queryContext; - _executorService = executorService; - _endTimeMs = endTimeMs; + super(operators, queryContext, executorService, endTimeMs); _streamObserver = streamObserver; _limit = queryContext.getLimit(); } @@ -82,109 +70,69 @@ public class StreamingSelectionOnlyCombineOperator extends BaseOperator<Intermed } @Override - protected IntermediateResultsBlock getNextBlock() { - int numOperators = _operators.size(); - int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators); - - // Use a BlockingQueue to store all the results blocks - BlockingQueue<IntermediateResultsBlock> blockingQueue = new LinkedBlockingQueue<>(); - // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread - // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is - // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined - // behavior (even JVM crash) when processing queries against it. - Phaser phaser = new Phaser(1); - - Future[] futures = new Future[numThreads]; - for (int i = 0; i < numThreads; i++) { - int threadIndex = i; - futures[i] = _executorService.submit(new TraceRunnable() { - @Override - public void runJob() { - try { - // Register the thread to the phaser - // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that - // means the query execution has finished, and the main thread has deregistered itself and returned - // the result. Directly return as no execution result will be taken. - if (phaser.register() < 0) { - return; - } - - int numRowsCollected = 0; - for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) { - Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex); - try { - IntermediateResultsBlock resultsBlock; - while ((resultsBlock = operator.nextBlock()) != null) { - Collection<Object[]> rows = resultsBlock.getSelectionResult(); - assert rows != null; - numRowsCollected += rows.size(); - blockingQueue.offer(resultsBlock); - if (numRowsCollected >= _limit) { - return; - } - } - blockingQueue.offer(LAST_RESULTS_BLOCK); - } catch (EarlyTerminationException e) { - // Early-terminated by interruption (canceled by the main thread) - return; - } catch (Exception e) { - // Caught exception, skip processing the remaining operators - LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, - _queryContext, e); - blockingQueue.offer(new IntermediateResultsBlock(e)); - return; - } - } - } finally { - phaser.arriveAndDeregister(); + protected void processSegments(int threadIndex) { + for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numThreads) { + Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex); + try { + IntermediateResultsBlock resultsBlock; + while ((resultsBlock = operator.nextBlock()) != null) { + Collection<Object[]> rows = resultsBlock.getSelectionResult(); + assert rows != null; + long numRowsCollected = _numRowsCollected.addAndGet(rows.size()); + _blockingQueue.offer(resultsBlock); + if (numRowsCollected >= _limit) { + return; } } - }); + _blockingQueue.offer(LAST_RESULTS_BLOCK); + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, _queryContext, + e); + _blockingQueue.offer(new IntermediateResultsBlock(e)); + return; + } } + } - try { - int numRowsCollected = 0; - int numOperatorsFinished = 0; - while (numRowsCollected < _limit && numOperatorsFinished < numOperators) { - IntermediateResultsBlock resultsBlock = - blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (resultsBlock == null) { - // Query times out, skip streaming the remaining results blocks - LOGGER.error("Timed out while polling results block (query: {})", _queryContext); - return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, - new TimeoutException("Timed out while polling results block"))); - } - if (resultsBlock.getProcessingExceptions() != null) { - // Caught exception while processing segment, skip streaming the remaining results blocks and directly return - // the exception - return resultsBlock; - } - if (resultsBlock == LAST_RESULTS_BLOCK) { - numOperatorsFinished++; - continue; - } - DataSchema dataSchema = resultsBlock.getDataSchema(); - Collection<Object[]> rows = resultsBlock.getSelectionResult(); - assert dataSchema != null && rows != null; - numRowsCollected += rows.size(); - DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema); - _streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable)); + @Override + protected IntermediateResultsBlock mergeResults() + throws Exception { + long numRowsCollected = 0; + int numOperatorsFinished = 0; + while (numRowsCollected < _limit && numOperatorsFinished < _numOperators) { + IntermediateResultsBlock resultsBlock = + _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (resultsBlock == null) { + // Query times out, skip streaming the remaining results blocks + LOGGER.error("Timed out while polling results block (query: {})", _queryContext); + return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); } - IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock(); - CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators); - return metadataBlock; - } catch (Exception e) { - LOGGER.error("Caught exception while streaming results blocks (query: {})", _queryContext, e); - return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e); - } finally { - // Cancel all ongoing jobs - for (Future future : futures) { - if (!future.isDone()) { - future.cancel(true); - } + if (resultsBlock.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip streaming the remaining results blocks and directly return + // the exception + return resultsBlock; + } + if (resultsBlock == LAST_RESULTS_BLOCK) { + numOperatorsFinished++; + continue; } - // Deregister the main thread and wait for all threads done - phaser.awaitAdvance(phaser.arriveAndDeregister()); + DataSchema dataSchema = resultsBlock.getDataSchema(); + Collection<Object[]> rows = resultsBlock.getSelectionResult(); + assert dataSchema != null && rows != null; + numRowsCollected += rows.size(); + DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema); + _streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable)); } + // Return an empty results block for the metadata + return new IntermediateResultsBlock(); + } + + @Override + protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) { } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org