This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 04bdb9a397 Enhance early terminate for combine operator (#10988) 04bdb9a397 is described below commit 04bdb9a39743569aaacfcb07e2cc7eaa08854791 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Jun 27 17:55:17 2023 -0700 Enhance early terminate for combine operator (#10988) --- .../core/operator/combine/BaseCombineOperator.java | 17 ++++++++++++-- .../combine/BaseSingleBlockCombineOperator.java | 19 ++++------------ .../operator/combine/GroupByCombineOperator.java | 26 ++++++---------------- ...xValueBasedSelectionOrderByCombineOperator.java | 2 +- .../streaming/BaseStreamingCombineOperator.java | 18 +++++---------- .../streaming/StreamingGroupByCombineOperator.java | 24 ++++++-------------- 6 files changed, 39 insertions(+), 67 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 9390925bc9..b4babb28a3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -19,17 +19,21 @@ package org.apache.pinot.core.operator.combine; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.util.QueryMultiThreadingUtils; import org.apache.pinot.core.util.trace.TraceRunnable; @@ -48,9 +52,10 @@ import org.slf4j.LoggerFactory; * detects that the merged results can already satisfy the query, or the query is already errored out or timed out. */ @SuppressWarnings({"rawtypes"}) -public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends BaseOperator<T> { +public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends BaseOperator<BaseResultsBlock> { private static final Logger LOGGER = LoggerFactory.getLogger(BaseCombineOperator.class); + protected final ResultsBlockMerger<T> _resultsBlockMerger; protected final List<Operator> _operators; protected final int _numOperators; protected final QueryContext _queryContext; @@ -58,11 +63,19 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba protected final int _numTasks; protected final Phaser _phaser; protected final Future[] _futures; + // Use an AtomicInteger to track the next operator to execute protected final AtomicInteger _nextOperatorId = new AtomicInteger(); + // Use a BlockingQueue to store the intermediate results blocks + protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>(); + // Use an AtomicReference to track the exception/error during segment processing + protected final AtomicReference<Throwable> _processingException = new AtomicReference<>(); + protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0); - protected BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) { + protected BaseCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators, + QueryContext queryContext, ExecutorService executorService) { + _resultsBlockMerger = resultsBlockMerger; _operators = operators; _numOperators = _operators.size(); _queryContext = queryContext; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java index 7366a61b69..f3dd847fba 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java @@ -19,12 +19,8 @@ package org.apache.pinot.core.operator.combine; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; @@ -46,20 +42,12 @@ import org.slf4j.LoggerFactory; */ @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock> - extends BaseCombineOperator<BaseResultsBlock> { + extends BaseCombineOperator<T> { private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingleBlockCombineOperator.class); - // Use an AtomicInteger to track the next operator to execute - protected final AtomicInteger _nextOperatorId = new AtomicInteger(); - // Use a BlockingQueue to store the intermediate results blocks - protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>(); - protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0); - protected final ResultsBlockMerger<T> _resultsBlockMerger; - protected BaseSingleBlockCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators, QueryContext queryContext, ExecutorService executorService) { - super(operators, queryContext, executorService); - _resultsBlockMerger = resultsBlockMerger; + super(resultsBlockMerger, operators, queryContext, executorService); } @Override @@ -92,7 +80,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock> @Override protected void processSegments() { int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { Operator operator = _operators.get(operatorId); T resultsBlock; try { @@ -116,6 +104,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock> @Override protected void onProcessSegmentsException(Throwable t) { + _processingException.compareAndSet(null, t); _blockingQueue.offer(new ExceptionResultsBlock(t)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 0ae665c26d..88fe1a702d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -18,18 +18,14 @@ */ package org.apache.pinot.core.operator.combine; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.ConcurrentIndexedTable; @@ -69,7 +65,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group private final int _numAggregationFunctions; private final int _numGroupByExpressions; private final int _numColumns; - private final ConcurrentLinkedQueue<ProcessingException> _mergedProcessingExceptions = new ConcurrentLinkedQueue<>(); // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished // _futures (try to interrupt the execution if it already started). private final CountDownLatch _operatorLatch; @@ -129,7 +124,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group @Override protected void processSegments() { int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { Operator operator = _operators.get(operatorId); try { if (operator instanceof AcquireReleaseColumnsSegmentOperator) { @@ -155,12 +150,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group } } - // Merge processing exceptions. - List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions(); - if (processingExceptionsToMerge != null) { - _mergedProcessingExceptions.addAll(processingExceptionsToMerge); - } - // Set groups limit reached flag. if (resultsBlock.isNumGroupsLimitReached()) { _numGroupsLimitReached = true; @@ -209,7 +198,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group @Override public void onProcessSegmentsException(Throwable t) { - _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t)); + _processingException.compareAndSet(null, t); } @Override @@ -244,6 +233,11 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group return new ExceptionResultsBlock(new TimeoutException(errorMessage)); } + Throwable processingException = _processingException.get(); + if (processingException != null) { + return new ExceptionResultsBlock(processingException); + } + IndexedTable indexedTable = _indexedTable; if (!_queryContext.isServerReturnFinalResult()) { indexedTable.finish(false); @@ -254,12 +248,6 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached); mergedBlock.setNumResizes(indexedTable.getNumResizes()); mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs()); - - // Set the processing exceptions. - if (!_mergedProcessingExceptions.isEmpty()) { - mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); - } - return mergedBlock; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index 666332d5ac..a5e744fc6d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -147,7 +147,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator Comparable threadBoundaryValue = null; int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { if (operatorId >= _endOperatorId.get()) { _blockingQueue.offer(EMPTY_RESULTS_BLOCK); continue; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java index 96680d124b..10c547d780 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java @@ -19,9 +19,7 @@ package org.apache.pinot.core.operator.streaming; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.pinot.common.exception.QueryException; @@ -42,25 +40,18 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({"rawtypes", "unchecked"}) public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock> - extends BaseCombineOperator<BaseResultsBlock> { + extends BaseCombineOperator<T> { private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamingCombineOperator.class); - /** - * Special results block to indicate that this is the last results block for a child operator in the list - */ + // Use a special results block to indicate that this is the last results block for a child operator in the list public static final MetadataResultsBlock LAST_RESULTS_BLOCK = new MetadataResultsBlock(); - // Use a BlockingQueue to store the intermediate results blocks - protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>(); - protected final ResultsBlockMerger<T> _resultsBlockMerger; - protected int _numOperatorsFinished; protected boolean _querySatisfied; public BaseStreamingCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators, QueryContext queryContext, ExecutorService executorService) { - super(operators, queryContext, executorService); - _resultsBlockMerger = resultsBlockMerger; + super(resultsBlockMerger, operators, queryContext, executorService); } /** @@ -115,7 +106,7 @@ public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock> protected void processSegments() { int operatorId; Object tracker = createQuerySatisfiedTracker(); - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { Operator<T> operator = _operators.get(operatorId); try { if (operator instanceof AcquireReleaseColumnsSegmentOperator) { @@ -152,6 +143,7 @@ public abstract class BaseStreamingCombineOperator<T extends BaseResultsBlock> @Override protected void onProcessSegmentsException(Throwable t) { + _processingException.compareAndSet(null, t); _blockingQueue.offer(new ExceptionResultsBlock(t)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java index 8b04d70265..53d0a3820b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java @@ -18,18 +18,15 @@ */ package org.apache.pinot.core.operator.streaming; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.ConcurrentIndexedTable; @@ -73,7 +70,6 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato private final int _numAggregationFunctions; private final int _numGroupByExpressions; private final int _numColumns; - private final ConcurrentLinkedQueue<ProcessingException> _mergedProcessingExceptions = new ConcurrentLinkedQueue<>(); // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished // _futures (try to interrupt the execution if it already started). private final CountDownLatch _operatorLatch; @@ -156,7 +152,7 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato @Override public void processSegments() { int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { Operator operator = _operators.get(operatorId); try { if (operator instanceof AcquireReleaseColumnsSegmentOperator) { @@ -182,12 +178,6 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato } } - // Merge processing exceptions. - List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions(); - if (processingExceptionsToMerge != null) { - _mergedProcessingExceptions.addAll(processingExceptionsToMerge); - } - // Set groups limit reached flag. if (resultsBlock.isNumGroupsLimitReached()) { _numGroupsLimitReached = true; @@ -248,6 +238,11 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato return new ExceptionResultsBlock(new TimeoutException(errorMessage)); } + Throwable processingException = _processingException.get(); + if (processingException != null) { + return new ExceptionResultsBlock(processingException); + } + IndexedTable indexedTable = _indexedTable; if (!_queryContext.isServerReturnFinalResult()) { indexedTable.finish(false); @@ -258,17 +253,12 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached); mergedBlock.setNumResizes(indexedTable.getNumResizes()); mergedBlock.setResizeTimeMs(indexedTable.getResizeTimeMs()); - - // Set the processing exceptions. - if (!_mergedProcessingExceptions.isEmpty()) { - mergedBlock.setProcessingExceptions(new ArrayList<>(_mergedProcessingExceptions)); - } return mergedBlock; } @Override public void onProcessSegmentsException(Throwable t) { - _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t)); + _processingException.compareAndSet(null, t); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org