siddharthteotia commented on code in PR #10169:
URL: https://github.com/apache/pinot/pull/10169#discussion_r1084762766


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java:
##########
@@ -50,49 +52,55 @@
 
   // Use a BlockingQueue to store the intermediate results blocks
   protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new 
LinkedBlockingQueue<>();
+  protected final ResultsBlockMerger<T> _resultsBlockMerger;
 
   protected int _numOperatorsFinished;
-  protected BaseResultsBlock _exceptionBlock;
+  protected boolean _querySatisfied;
 
-  public BaseStreamingCombineOperator(List<Operator> operators,
+  public BaseStreamingCombineOperator(ResultsBlockMerger<T> 
resultsBlockMerger, List<Operator> operators,
       QueryContext queryContext, ExecutorService executorService) {
     super(operators, queryContext, executorService);
+    _resultsBlockMerger = resultsBlockMerger;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * When all the results blocks are returned, returns a final metadata block. 
Caller shouldn't call this method after
+   * it returns the metadata block or exception block.
+   */
   @Override
   protected BaseResultsBlock getNextBlock() {
     long endTimeMs = _queryContext.getEndTimeMs();
-    // TODO: Early terminate when query is satisfied
-    while (_exceptionBlock == null && _numOperatorsFinished < _numOperators) {
+    Object querySatisfiedTracker = createQuerySatisfiedTracker();
+    while (!_querySatisfied && _numOperatorsFinished < _numOperators) {
       try {
         BaseResultsBlock resultsBlock =
             _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-
         if (resultsBlock == null) {
           // Query times out, skip streaming the remaining results blocks
           LOGGER.error("Timed out while polling results block (query: {})", 
_queryContext);
-          _exceptionBlock = new ExceptionResultsBlock(
-              
QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-                  new TimeoutException("Timed out while polling results 
block")));
-          return _exceptionBlock;
+          return new 
ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
         }
         if (resultsBlock.getProcessingExceptions() != null) {
           // Caught exception while processing segment, skip streaming the 
remaining results blocks and directly return
           // the exception
-          _exceptionBlock = resultsBlock;
-          return _exceptionBlock;
+          return resultsBlock;
         }
         if (resultsBlock == LAST_RESULTS_BLOCK) {
           // Caught LAST_RESULTS_BLOCK from a specific task, indicated it has 
finished.
           // Skip returning this metadata block and continue to process the 
next from the _blockingQueue.
           _numOperatorsFinished++;
           continue;
         }
+        _querySatisfied = isQuerySatisfied((T) resultsBlock, 
querySatisfiedTracker);
         return resultsBlock;
+      } catch (InterruptedException e) {
+        throw new EarlyTerminationException();

Review Comment:
   May be we should instantiate `EarlyTerminationException` with 
`e.getMessage()` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to