siddharthteotia commented on code in PR #10169: URL: https://github.com/apache/pinot/pull/10169#discussion_r1084762766
########## pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java: ########## @@ -50,49 +52,55 @@ // Use a BlockingQueue to store the intermediate results blocks protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>(); + protected final ResultsBlockMerger<T> _resultsBlockMerger; protected int _numOperatorsFinished; - protected BaseResultsBlock _exceptionBlock; + protected boolean _querySatisfied; - public BaseStreamingCombineOperator(List<Operator> operators, + public BaseStreamingCombineOperator(ResultsBlockMerger<T> resultsBlockMerger, List<Operator> operators, QueryContext queryContext, ExecutorService executorService) { super(operators, queryContext, executorService); + _resultsBlockMerger = resultsBlockMerger; } + /** + * {@inheritDoc} + * + * When all the results blocks are returned, returns a final metadata block. Caller shouldn't call this method after + * it returns the metadata block or exception block. + */ @Override protected BaseResultsBlock getNextBlock() { long endTimeMs = _queryContext.getEndTimeMs(); - // TODO: Early terminate when query is satisfied - while (_exceptionBlock == null && _numOperatorsFinished < _numOperators) { + Object querySatisfiedTracker = createQuerySatisfiedTracker(); + while (!_querySatisfied && _numOperatorsFinished < _numOperators) { try { BaseResultsBlock resultsBlock = _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (resultsBlock == null) { // Query times out, skip streaming the remaining results blocks LOGGER.error("Timed out while polling results block (query: {})", _queryContext); - _exceptionBlock = new ExceptionResultsBlock( - QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, - new TimeoutException("Timed out while polling results block"))); - return _exceptionBlock; + return new ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); } if (resultsBlock.getProcessingExceptions() != null) { // Caught exception while processing segment, skip streaming the remaining results blocks and directly return // the exception - _exceptionBlock = resultsBlock; - return _exceptionBlock; + return resultsBlock; } if (resultsBlock == LAST_RESULTS_BLOCK) { // Caught LAST_RESULTS_BLOCK from a specific task, indicated it has finished. // Skip returning this metadata block and continue to process the next from the _blockingQueue. _numOperatorsFinished++; continue; } + _querySatisfied = isQuerySatisfied((T) resultsBlock, querySatisfiedTracker); return resultsBlock; + } catch (InterruptedException e) { + throw new EarlyTerminationException(); Review Comment: May be we should instantiate `EarlyTerminationException` with `e.getMessage()` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org