walterddr commented on code in PR #10169:
URL: https://github.com/apache/pinot/pull/10169#discussion_r1085993792


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java:
##########
@@ -71,11 +68,8 @@ protected BaseResultsBlock getNextBlock() {
     try {
       startProcess();
       mergedBlock = mergeResults();
-    } catch (InterruptedException | EarlyTerminationException e) {
-      Exception killedErrorMsg = 
Tracing.getThreadAccountant().getErrorStatus();
-      throw new QueryCancelledException(
-          "Cancelled while merging results blocks" + (killedErrorMsg == null ? 
StringUtils.EMPTY
-              : " " + killedErrorMsg), e);
+    } catch (InterruptedException e) {
+      throw new EarlyTerminationException();

Review Comment:
   plz include the original exception ?
   ```suggestion
         throw new EarlyTerminationException(e);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java:
##########
@@ -111,13 +105,11 @@ protected void processSegments() {
           ((AcquireReleaseColumnsSegmentOperator) operator).release();
         }
       }
-
+      _blockingQueue.offer(resultsBlock);
+      // When query is satisfied, skip processing the remaining segments
       if (_resultsBlockMerger.isQuerySatisfied(resultsBlock)) {
-        // Query is satisfied, skip processing the remaining segments
-        _blockingQueue.offer(resultsBlock);
+        _nextOperatorId.set(_numOperators);

Review Comment:
   hmm. i am a bit worry about this one. 
   basically, this will make numBlocksMerged to practically impossible to reach 
numOperators during mergeResultBlock method. 
   so we are solely relying on the early termination mechanism to also early 
terminate on the mergeResultBlock.
   
   however the 
   `_resultsBlockMerger.isQuerySatisfied(resultsBlock)` is not the same as the 
one in merge method
   `_resultsBlockMerger.isQuerySatisfied(mregedBlock)`
   are we 100% sure these are equivalent for sure?
   



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java:
##########
@@ -50,49 +52,55 @@
 
   // Use a BlockingQueue to store the intermediate results blocks
   protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new 
LinkedBlockingQueue<>();
+  protected final ResultsBlockMerger<T> _resultsBlockMerger;
 
   protected int _numOperatorsFinished;
-  protected BaseResultsBlock _exceptionBlock;
+  protected boolean _querySatisfied;
 
-  public BaseStreamingCombineOperator(List<Operator> operators,
+  public BaseStreamingCombineOperator(ResultsBlockMerger<T> 
resultsBlockMerger, List<Operator> operators,
       QueryContext queryContext, ExecutorService executorService) {
     super(operators, queryContext, executorService);
+    _resultsBlockMerger = resultsBlockMerger;
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * When all the results blocks are returned, returns a final metadata block. 
Caller shouldn't call this method after
+   * it returns the metadata block or exception block.
+   */
   @Override
   protected BaseResultsBlock getNextBlock() {
     long endTimeMs = _queryContext.getEndTimeMs();
-    // TODO: Early terminate when query is satisfied
-    while (_exceptionBlock == null && _numOperatorsFinished < _numOperators) {
+    Object querySatisfiedTracker = createQuerySatisfiedTracker();
+    while (!_querySatisfied && _numOperatorsFinished < _numOperators) {
       try {
         BaseResultsBlock resultsBlock =
             _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-
         if (resultsBlock == null) {
           // Query times out, skip streaming the remaining results blocks
           LOGGER.error("Timed out while polling results block (query: {})", 
_queryContext);
-          _exceptionBlock = new ExceptionResultsBlock(
-              
QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-                  new TimeoutException("Timed out while polling results 
block")));
-          return _exceptionBlock;
+          return new 
ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
         }
         if (resultsBlock.getProcessingExceptions() != null) {
           // Caught exception while processing segment, skip streaming the 
remaining results blocks and directly return
           // the exception
-          _exceptionBlock = resultsBlock;
-          return _exceptionBlock;
+          return resultsBlock;
         }
         if (resultsBlock == LAST_RESULTS_BLOCK) {
           // Caught LAST_RESULTS_BLOCK from a specific task, indicated it has 
finished.
           // Skip returning this metadata block and continue to process the 
next from the _blockingQueue.
           _numOperatorsFinished++;
           continue;
         }
+        _querySatisfied = isQuerySatisfied((T) resultsBlock, 
querySatisfiedTracker);
         return resultsBlock;
+      } catch (InterruptedException e) {
+        throw new EarlyTerminationException();

Review Comment:
   actually i have another suggestion, let's put `e` in it, which might include 
stack trace, or do a logger here. 



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java:
##########
@@ -111,6 +116,10 @@ private BaseResultsBlock getCombinedResults() {
     try {
       prefetchAll();
       return _combineOperator.nextBlock();
+    } catch (EarlyTerminationException e) {
+      Exception killedErrorMsg = 
Tracing.getThreadAccountant().getErrorStatus();
+      return new ExceptionResultsBlock(new QueryCancelledException(
+          "Cancelled while combining results" + (killedErrorMsg == null ? 
StringUtils.EMPTY : " " + killedErrorMsg)));

Review Comment:
   this is highly unlikely but i was able to fiddle around the underlying 
threading mechanism to reproduce it :-D. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to