Jackie-Jiang commented on code in PR #8523:
URL: https://github.com/apache/pinot/pull/8523#discussion_r851513738


##########
pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java:
##########
@@ -73,87 +76,33 @@ public CombinePlanNode(List<PlanNode> planNodes, 
QueryContext queryContext, Exec
     _streamObserver = streamObserver;
   }
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
   @Override
   public BaseCombineOperator run() {
-    int numPlanNodes = _planNodes.size();
-    List<Operator> operators = new ArrayList<>(numPlanNodes);
+    try (InvocationScope scope = 
Tracing.getTracer().createScope(CombinePlanNode.class)) {
+      return getCombineOperator(scope);
+    }
+  }
 
+  private BaseCombineOperator getCombineOperator(InvocationScope scope) {
+    int numPlanNodes = _planNodes.size();
+    scope.setNumChildren(numPlanNodes);
+    List<Operator> operators;
     if (numPlanNodes <= TARGET_NUM_PLANS_PER_THREAD) {
       // Small number of plan nodes, run them sequentially
-      for (PlanNode planNode : _planNodes) {
-        operators.add(planNode.run());
-      }
+      operators = getOperatorsSingleThreaded();
     } else {
       // Large number of plan nodes, run them in parallel
-
       int maxExecutionThreads = _queryContext.getMaxExecutionThreads();
       if (maxExecutionThreads <= 0) {
         maxExecutionThreads = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY;
       }
-      int numTasks =
-          Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / 
TARGET_NUM_PLANS_PER_THREAD, maxExecutionThreads);
-
-      // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
-      // returns. We need to ensure no execution left before the main thread 
returning because the main thread holds the
-      // reference to the segments, and if the segments are deleted/refreshed, 
the segments can be released after the
-      // main thread returns, which would lead to undefined behavior (even JVM 
crash) when executing queries against
-      // them.
-      Phaser phaser = new Phaser(1);
-
-      // Submit all jobs
-      Future[] futures = new Future[numTasks];
-      for (int i = 0; i < numTasks; i++) {
-        int index = i;
-        futures[i] = _executorService.submit(new 
TraceCallable<List<Operator>>() {
-          @Override
-          public List<Operator> callJob() {
-            try {
-              // Register the thread to the phaser.
-              // If the phaser is terminated (returning negative value) when 
trying to register the thread, that means
-              // the query execution has timed out, and the main thread has 
deregistered itself and returned the result.
-              // Directly return as no execution result will be taken.
-              if (phaser.register() < 0) {
-                return Collections.emptyList();
-              }
-
-              List<Operator> operators = new ArrayList<>();
-              for (int i = index; i < numPlanNodes; i += numTasks) {
-                operators.add(_planNodes.get(i).run());
-              }
-              return operators;
-            } finally {
-              phaser.arriveAndDeregister();
-            }
-          }
-        });
-      }
-
-      // Get all results
-      try {
-        for (Future future : futures) {
-          List<Operator> ops = (List<Operator>) 
future.get(_queryContext.getEndTimeMs() - System.currentTimeMillis(),
-              TimeUnit.MILLISECONDS);
-          operators.addAll(ops);
-        }
-      } catch (Exception e) {
-        // Future object will throw ExecutionException for execution 
exception, need to check the cause to determine
-        // whether it is caused by bad query
-        Throwable cause = e.getCause();
-        if (cause instanceof BadQueryRequestException) {
-          throw (BadQueryRequestException) cause;
-        } else {
-          throw new RuntimeException("Caught exception while running 
CombinePlanNode.", e);
-        }
-      } finally {
-        // Cancel all ongoing jobs
-        for (Future future : futures) {
-          if (!future.isDone()) {
-            future.cancel(true);
-          }
-        }
-        // Deregister the main thread and wait for all threads done
-        phaser.awaitAdvance(phaser.arriveAndDeregister());
+      if (maxExecutionThreads == 1) {
+        operators = getOperatorsSingleThreaded();
+      } else {
+        int numTasks = Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 
1) / TARGET_NUM_PLANS_PER_THREAD,
+            maxExecutionThreads);
+        scope.setNumTasks(numTasks);

Review Comment:
   That's already recorded on line 88



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to