siddharthteotia commented on code in PR #10006:
URL: https://github.com/apache/pinot/pull/10006#discussion_r1068855977


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -122,166 +118,64 @@ public void runJob() {
             } else {
               LOGGER.error("Caught serious error while processing query: " + 
_queryContext, t);
             }
-            onException(t);
+            onProcessSegmentsException(t);
           } finally {
-            onFinish();
-            phaser.arriveAndDeregister();
+            onProcessSegmentsFinish();
+            _phaser.arriveAndDeregister();
             Tracing.ThreadAccountantOps.clear();
           }
 
           
_totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs());
         }
       });
     }
-
-    BaseResultsBlock mergedBlock;
-    try {
-      mergedBlock = mergeResults();
-    } catch (InterruptedException | EarlyTerminationException e) {
-      Exception killedErrorMsg = 
Tracing.getThreadAccountant().getErrorStatus();
-      throw new QueryCancelledException(
-          "Cancelled while merging results blocks"
-              + (killedErrorMsg == null ? StringUtils.EMPTY : " " + 
killedErrorMsg), e);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while merging results blocks (query: 
{})", _queryContext, e);
-      mergedBlock = new 
ExceptionResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-      // Deregister the main thread and wait for all threads done
-      phaser.awaitAdvance(phaser.arriveAndDeregister());
-    }
-    /*
-     * _numTasks are number of async tasks submitted to the _executorService, 
but it does not mean Pinot server
-     * use those number of threads to concurrently process segments. Instead, 
if _executorService thread pool has
-     * less number of threads than _numTasks, the number of threads that used 
to concurrently process segments equals
-     * to the pool size.
-     * TODO: Get the actual number of query worker threads instead of using 
the default value.
-     */
-    int numServerThreads = Math.min(_numTasks, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
-    CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators, 
_totalWorkerThreadCpuTimeNs.get(),
-        numServerThreads);
-    return mergedBlock;
   }
 
   /**
-   * Executes query on one or more segments in a worker thread.
+   * Stop the combine operator process. This will stop all sub-tasks that were 
spun up to process data segments.
    */
-  protected void processSegments() {
-    int operatorId;
-    while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
-      Operator operator = _operators.get(operatorId);
-      T resultsBlock;
-      try {
-        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
-          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
-        }
-        resultsBlock = (T) operator.nextBlock();
-      } finally {
-        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
-          ((AcquireReleaseColumnsSegmentOperator) operator).release();
-        }
-      }
-
-      if (isQuerySatisfied(resultsBlock)) {
-        // Query is satisfied, skip processing the remaining segments
-        _blockingQueue.offer(resultsBlock);
-        return;
-      } else {
-        _blockingQueue.offer(resultsBlock);
+  protected void stopProcess() {
+    // Cancel all ongoing jobs
+    for (Future future : _futures) {
+      if (!future.isDone()) {
+        future.cancel(true);
       }
     }
+    // Deregister the main thread and wait for all threads done
+    _phaser.awaitAdvance(_phaser.arriveAndDeregister());

Review Comment:
   This should be in finally imo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to