siddharthteotia commented on code in PR #10006: URL: https://github.com/apache/pinot/pull/10006#discussion_r1068855977
########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java: ########## @@ -122,166 +118,64 @@ public void runJob() { } else { LOGGER.error("Caught serious error while processing query: " + _queryContext, t); } - onException(t); + onProcessSegmentsException(t); } finally { - onFinish(); - phaser.arriveAndDeregister(); + onProcessSegmentsFinish(); + _phaser.arriveAndDeregister(); Tracing.ThreadAccountantOps.clear(); } _totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs()); } }); } - - BaseResultsBlock mergedBlock; - try { - mergedBlock = mergeResults(); - } catch (InterruptedException | EarlyTerminationException e) { - Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus(); - throw new QueryCancelledException( - "Cancelled while merging results blocks" - + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg), e); - } catch (Exception e) { - LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e); - mergedBlock = new ExceptionResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e)); - } finally { - // Cancel all ongoing jobs - for (Future future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - // Deregister the main thread and wait for all threads done - phaser.awaitAdvance(phaser.arriveAndDeregister()); - } - /* - * _numTasks are number of async tasks submitted to the _executorService, but it does not mean Pinot server - * use those number of threads to concurrently process segments. Instead, if _executorService thread pool has - * less number of threads than _numTasks, the number of threads that used to concurrently process segments equals - * to the pool size. - * TODO: Get the actual number of query worker threads instead of using the default value. - */ - int numServerThreads = Math.min(_numTasks, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); - CombineOperatorUtils.setExecutionStatistics(mergedBlock, _operators, _totalWorkerThreadCpuTimeNs.get(), - numServerThreads); - return mergedBlock; } /** - * Executes query on one or more segments in a worker thread. + * Stop the combine operator process. This will stop all sub-tasks that were spun up to process data segments. */ - protected void processSegments() { - int operatorId; - while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { - Operator operator = _operators.get(operatorId); - T resultsBlock; - try { - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { - ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); - } - resultsBlock = (T) operator.nextBlock(); - } finally { - if (operator instanceof AcquireReleaseColumnsSegmentOperator) { - ((AcquireReleaseColumnsSegmentOperator) operator).release(); - } - } - - if (isQuerySatisfied(resultsBlock)) { - // Query is satisfied, skip processing the remaining segments - _blockingQueue.offer(resultsBlock); - return; - } else { - _blockingQueue.offer(resultsBlock); + protected void stopProcess() { + // Cancel all ongoing jobs + for (Future future : _futures) { + if (!future.isDone()) { + future.cancel(true); } } + // Deregister the main thread and wait for all threads done + _phaser.awaitAdvance(_phaser.arriveAndDeregister()); Review Comment: This should be in finally imo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org