Copilot commented on code in PR #16582:
URL: https://github.com/apache/pinot/pull/16582#discussion_r2272016709


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java:
##########
@@ -340,4 +349,59 @@ public void earlyTerminateMethodInterruptsSseTasks() {
     // Then:
     verify(operator).cancelSseTasks();
   }
+
+  @Test
+  public void executionThreadShouldNotBlockOnLastResultsBlockWhenCancelled()
+      throws Exception {
+    // Given: operator with queue size 1
+    DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
+
+    Map<String, String> opChainMetadata = new HashMap<>();
+    opChainMetadata.put("maxStreamingPendingBlocks", "1");
+    opChainMetadata.put("timeoutMs", "100000");
+    OpChainExecutionContext context = 
OperatorTestUtil.getContext(opChainMetadata);
+
+    LeafOperator operator =
+        new LeafOperator(context, mockQueryRequests(1), schema, 
mock(QueryExecutor.class), _executorService) {
+          @Override
+          void execute(ThreadExecutionContext parentContext) {
+            try {
+              // Fill queue and block on second add
+              SelectionResultsBlock dataBlock =
+                  new SelectionResultsBlock(schema, Arrays.asList(new 
Object[]{"foo", 1}, new Object[]{"", 2}),
+                      queryContext);
+              //noinspection InfiniteLoopStatement
+              while (true) {
+                addResultsBlock(dataBlock);
+              }
+            } catch (Exception e) {
+              assertTrue(e instanceof InterruptedException);
+            }
+          }
+        };
+
+    // Main thread read the next block to start the execution
+    assertTrue(operator.getNextBlock() instanceof MseBlock.Data);
+
+    // Wait for blocking queue to fill up
+    while (operator._blockingQueue.isEmpty()) {
+      //noinspection BusyWait
+      Thread.sleep(100);
+    }

Review Comment:
   Using Thread.sleep() in a busy wait loop is inefficient. Consider using a 
CountDownLatch or similar synchronization primitive to wait for the queue to 
fill up instead of polling with sleep.
   ```suggestion
       Object firstBlock = operator._blockingQueue.take();
       operator._blockingQueue.add(firstBlock);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java:
##########
@@ -374,114 +393,155 @@ private ExplainedNode asNode(ExplainInfo info) {
     return new ExplainedNode(_context.getStageId(), _dataSchema, null, inputs, 
info.getTitle(), info.getAttributes());
   }
 
-  private Future<Void> startExecution() {
-    ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
-    ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+  @Nullable
+  private ErrorMseBlock getErrorBlock() {
+    return _errorBlock.get();
+  }
+
+  private void setErrorBlock(ErrorMseBlock errorBlock) {
+    // Keep the first encountered error block
+    _errorBlock.compareAndSet(null, errorBlock);
+  }
+
+  private Future<?> startExecution() {
     ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
     return _executorService.submit(() -> {
       try {
-        if (_requests.size() == 1) {
-          ServerQueryRequest request = _requests.get(0);
-          Tracing.ThreadAccountantOps.setupWorker(1, parentContext);
-
-          InstanceResponseBlock instanceResponseBlock =
-              _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
-          if (queryLogger != null) {
-            queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
+        execute(parentContext);
+      } catch (Exception e) {
+        setErrorBlock(
+            ErrorMseBlock.fromError(QueryErrorCode.INTERNAL, "Caught exception 
while executing leaf stage: " + e));
+      } finally {
+        // Always add the last results block to mark the end of the execution 
and notify the main thread waiting for the
+        // results block.
+        try {
+          addResultsBlock(LAST_RESULTS_BLOCK);
+        } catch (Exception e) {
+          if (!(e instanceof EarlyTerminationException)) {
+            LOGGER.warn("Failed to add the last results block", e);
           }
-          // TODO: Revisit if we should treat all exceptions as query failure. 
Currently MERGE_RESPONSE_ERROR and
-          //       SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
-          Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
-          if (!exceptions.isEmpty()) {
-            _exceptions = exceptions;
-          } else {
-            // NOTE: Instance response block might contain data (not metadata 
only) when all the segments are pruned.
-            //       Add the results block if it contains data.
-            BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
-            if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
-              addResultsBlock(resultsBlock);
+        }
+      }
+    });
+  }
+
+  @VisibleForTesting
+  void execute(ThreadExecutionContext parentContext) {
+    ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
+    ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+    if (_requests.size() == 1) {
+      ServerQueryRequest request = _requests.get(0);
+      Tracing.ThreadAccountantOps.setupWorker(1, parentContext);
+
+      InstanceResponseBlock instanceResponseBlock =
+          _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
+      if (queryLogger != null) {
+        queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
+      }
+      // Collect the execution stats
+      mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
+      // TODO: Revisit if we should treat all exceptions as query failure. 
Currently MERGE_RESPONSE_ERROR and
+      //       SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
+      Map<Integer, String> exceptions = instanceResponseBlock.getExceptions();
+      if (!exceptions.isEmpty()) {
+        
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+      } else {
+        // NOTE: Instance response block might contain data (not metadata 
only) when all the segments are pruned.
+        //       Add the results block if it contains data.
+        BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
+        if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
+          try {
+            addResultsBlock(resultsBlock);
+          } catch (InterruptedException e) {
+            setErrorBlock(CANCELLED_BLOCK);
+          } catch (TimeoutException e) {
+            setErrorBlock(TIMEOUT_BLOCK);
+          } catch (Exception e) {
+            if (!(e instanceof EarlyTerminationException)) {
+              LOGGER.warn("Failed to add results block", e);
+            }
+          }
+        }
+      }
+    } else {
+      // Hit 2 physical tables, one REALTIME and one OFFLINE
+      assert _requests.size() == 2;
+      Future<?>[] futures = new Future[2];
+      // TODO: this latch mechanism is not the most elegant. We should change 
it to use a CompletionService.
+      //  In order to interrupt the execution in case of error, we could 
different mechanisms like throwing in the
+      //  future, or using a shared volatile variable.
+      CountDownLatch latch = new CountDownLatch(2);
+      for (int i = 0; i < 2; i++) {
+        ServerQueryRequest request = _requests.get(i);
+        int taskId = i;
+        futures[i] = _executorService.submit(() -> {
+          Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
+
+          try {
+            InstanceResponseBlock instanceResponseBlock =
+                _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
+            if (queryLogger != null) {
+              queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
             }
             // Collect the execution stats
             mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
-          }
-        } else {
-          assert _requests.size() == 2;
-          Future<Map<String, String>>[] futures = new Future[2];
-          // TODO: this latch mechanism is not the most elegant. We should 
change it to use a CompletionService.
-          //  In order to interrupt the execution in case of error, we could 
different mechanisms like throwing in the
-          //  future, or using a shared volatile variable.
-          CountDownLatch latch = new CountDownLatch(2);
-          for (int i = 0; i < 2; i++) {
-            ServerQueryRequest request = _requests.get(i);
-            int taskId = i;
-            futures[i] = _executorService.submit(() -> {
-              Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
-
-              try {
-                InstanceResponseBlock instanceResponseBlock =
-                    _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
-                if (queryLogger != null) {
-                  queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
-                }
-                Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
-                if (!exceptions.isEmpty()) {
-                  // Drain the latch when receiving exception block and not 
wait for the other thread to finish
-                  _exceptions = exceptions;
-                  latch.countDown();
-                  return Collections.emptyMap();
-                } else {
-                  // NOTE: Instance response block might contain data (not 
metadata only) when all the segments are
-                  //       pruned. Add the results block if it contains data.
-                  BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
-                  if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
-                    addResultsBlock(resultsBlock);
+            Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
+            if (!exceptions.isEmpty()) {
+              
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+              // Drain the latch when receiving exception block and not wait 
for the other thread to finish
+              latch.countDown();
+            } else {
+              // NOTE: Instance response block might contain data (not 
metadata only) when all the segments are
+              //       pruned. Add the results block if it contains data.
+              BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
+              if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
+                try {
+                  addResultsBlock(resultsBlock);
+                } catch (InterruptedException e) {
+                  setErrorBlock(CANCELLED_BLOCK);
+                } catch (TimeoutException e) {
+                  setErrorBlock(TIMEOUT_BLOCK);
+                } catch (Exception e) {
+                  if (!(e instanceof EarlyTerminationException)) {
+                    LOGGER.warn("Failed to add results block", e);
                   }
-                  // Collect the execution stats
-                  return instanceResponseBlock.getResponseMetadata();
                 }
-              } finally {
-                latch.countDown();
               }
-            });
-          }
-          try {
-            if (!latch.await(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
-              throw new TimeoutException("Timed out waiting for leaf stage to 
finish");
-            }
-            // Propagate the exception thrown by the leaf stage
-            for (Future<Map<String, String>> future : futures) {
-              Map<String, String> stats =
-                  future.get(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-              mergeExecutionStats(stats);
             }
-          } catch (TimeoutException e) {
-            throw new TimeoutException("Timed out waiting for leaf stage to 
finish");
           } finally {
-            for (Future<?> future : futures) {
-              future.cancel(true);
-            }
+            latch.countDown();
           }
+        });
+      }
+      try {
+        if (!latch.await(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
+          setErrorBlock(TIMEOUT_BLOCK);
         }
-        return null;
+      } catch (InterruptedException e) {
+        setErrorBlock(TIMEOUT_BLOCK);

Review Comment:
   InterruptedException should set the cancelled error block, not the timeout 
error block. This line should be `setErrorBlock(CANCELLED_BLOCK);` to properly 
handle thread interruption.
   ```suggestion
           setErrorBlock(CANCELLED_BLOCK);
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java:
##########
@@ -145,225 +155,234 @@ public String toExplainString() {
   }
 
   @Override
-  protected MseBlock getNextBlock()
-      throws InterruptedException, TimeoutException {
+  protected MseBlock getNextBlock() {
     if (_executionFuture == null) {
       _executionFuture = startExecution();
     }
     if (_isEarlyTerminated) {
+      terminateAndClearResultsBlocks();
       return SuccessMseBlock.INSTANCE;
     }
-    // Here we use passive deadline because we end up waiting for the SSE 
operators
-    // which can timeout by their own
-    BaseResultsBlock resultsBlock =
-        _blockingQueue.poll(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    BaseResultsBlock resultsBlock;
+    try {
+      // Here we use passive deadline because we end up waiting for the SSE 
operators which can timeout by their own.
+      resultsBlock =
+          _blockingQueue.poll(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      terminateAndClearResultsBlocks();
+      return CANCELLED_BLOCK;
+    }
     if (resultsBlock == null) {
-      throw new TimeoutException("Timed out waiting for results block");
+      terminateAndClearResultsBlocks();
+      return TIMEOUT_BLOCK;
     }
-    // Terminate when receiving exception block
-    Map<Integer, String> exceptions = _exceptions;
-    if (exceptions != null) {
-      return ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions));
+    // Terminate when there is error block
+    ErrorMseBlock errorBlock = getErrorBlock();
+    if (errorBlock != null) {
+      terminateAndClearResultsBlocks();
+      return errorBlock;
     }
     if (resultsBlock == LAST_RESULTS_BLOCK) {
+      _terminated = true;
       return SuccessMseBlock.INSTANCE;
     } else {
       // Regular data block
       return composeMseBlock(resultsBlock);
     }
   }
 
-  @Override
-  protected StatMap<?> copyStatMaps() {
-    return new StatMap<>(_statMap);
-  }
-
-  @Override
-  protected void earlyTerminate() {
-    super.earlyTerminate();
-    cancelSseTasks();
-  }
-
-  @Override
-  public void cancel(Throwable e) {
-    super.cancel(e);
-    cancelSseTasks();
-  }
-
-  @VisibleForTesting
-  protected void cancelSseTasks() {
-    Future<Void> executionFuture = _executionFuture;
-    if (executionFuture != null) {
-      executionFuture.cancel(true);
-    }
-  }
-
-  private void mergeExecutionStats(@Nullable Map<String, String> 
executionStats) {
-    if (executionStats != null) {
-      for (Map.Entry<String, String> entry : executionStats.entrySet()) {
-        DataTable.MetadataKey key = 
DataTable.MetadataKey.getByName(entry.getKey());
-        if (key == null) {
-          LOGGER.debug("Skipping unknown execution stat: {}", entry.getKey());
-          continue;
-        }
-        switch (key) {
-          case UNKNOWN:
-            LOGGER.debug("Skipping unknown execution stat: {}", 
entry.getKey());
-            break;
-          case TABLE:
-            _statMap.merge(StatKey.TABLE, entry.getValue());
-            break;
-          case NUM_DOCS_SCANNED:
-            _statMap.merge(StatKey.NUM_DOCS_SCANNED, 
Long.parseLong(entry.getValue()));
-            break;
-          case NUM_ENTRIES_SCANNED_IN_FILTER:
-            _statMap.merge(StatKey.NUM_ENTRIES_SCANNED_IN_FILTER, 
Long.parseLong(entry.getValue()));
-            break;
-          case NUM_ENTRIES_SCANNED_POST_FILTER:
-            _statMap.merge(StatKey.NUM_ENTRIES_SCANNED_POST_FILTER, 
Long.parseLong(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_QUERIED:
-            _statMap.merge(StatKey.NUM_SEGMENTS_QUERIED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_PROCESSED:
-            _statMap.merge(StatKey.NUM_SEGMENTS_PROCESSED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_MATCHED:
-            _statMap.merge(StatKey.NUM_SEGMENTS_MATCHED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_CONSUMING_SEGMENTS_QUERIED:
-            _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_QUERIED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case MIN_CONSUMING_FRESHNESS_TIME_MS:
-            _statMap.merge(StatKey.MIN_CONSUMING_FRESHNESS_TIME_MS, 
Long.parseLong(entry.getValue()));
-            break;
-          case TOTAL_DOCS:
-            _statMap.merge(StatKey.TOTAL_DOCS, 
Long.parseLong(entry.getValue()));
-            break;
-          case GROUPS_TRIMMED:
-            _statMap.merge(StatKey.GROUPS_TRIMMED, 
Boolean.parseBoolean(entry.getValue()));
-            break;
-          case NUM_GROUPS_LIMIT_REACHED:
-            _statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, 
Boolean.parseBoolean(entry.getValue()));
-            break;
-          case NUM_GROUPS_WARNING_LIMIT_REACHED:
-            _statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, 
Boolean.parseBoolean(entry.getValue()));
-            break;
-          case TIME_USED_MS:
-            _statMap.merge(StatKey.EXECUTION_TIME_MS, 
Long.parseLong(entry.getValue()));
-            break;
-          case TRACE_INFO:
-            LOGGER.debug("Skipping trace info: {}", entry.getValue());
-            break;
-          case REQUEST_ID:
-            LOGGER.debug("Skipping request ID: {}", entry.getValue());
-            break;
-          case NUM_RESIZES:
-            _statMap.merge(StatKey.NUM_RESIZES, 
Integer.parseInt(entry.getValue()));
-            break;
-          case RESIZE_TIME_MS:
-            _statMap.merge(StatKey.RESIZE_TIME_MS, 
Long.parseLong(entry.getValue()));
-            break;
-          case THREAD_CPU_TIME_NS:
-            _statMap.merge(StatKey.THREAD_CPU_TIME_NS, 
Long.parseLong(entry.getValue()));
-            break;
-          case SYSTEM_ACTIVITIES_CPU_TIME_NS:
-            _statMap.merge(StatKey.SYSTEM_ACTIVITIES_CPU_TIME_NS, 
Long.parseLong(entry.getValue()));
-            break;
-          case RESPONSE_SER_CPU_TIME_NS:
-            _statMap.merge(StatKey.RESPONSE_SER_CPU_TIME_NS, 
Long.parseLong(entry.getValue()));
-            break;
-          case THREAD_MEM_ALLOCATED_BYTES:
-            _statMap.merge(StatKey.THREAD_MEM_ALLOCATED_BYTES, 
Long.parseLong(entry.getValue()));
-            break;
-          case RESPONSE_SER_MEM_ALLOCATED_BYTES:
-            _statMap.merge(StatKey.RESPONSE_SER_MEM_ALLOCATED_BYTES, 
Long.parseLong(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_PRUNED_BY_SERVER:
-            _statMap.merge(StatKey.NUM_SEGMENTS_PRUNED_BY_SERVER, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_PRUNED_INVALID:
-            _statMap.merge(StatKey.NUM_SEGMENTS_PRUNED_INVALID, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_PRUNED_BY_LIMIT:
-            _statMap.merge(StatKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_SEGMENTS_PRUNED_BY_VALUE:
-            _statMap.merge(StatKey.NUM_SEGMENTS_PRUNED_BY_VALUE, 
Integer.parseInt(entry.getValue()));
-            break;
-          case EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS:
-            LOGGER.debug("Skipping empty filter segments: {}", 
entry.getValue());
-            break;
-          case EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS:
-            LOGGER.debug("Skipping match all filter segments: {}", 
entry.getValue());
-            break;
-          case NUM_CONSUMING_SEGMENTS_PROCESSED:
-            _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_PROCESSED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case NUM_CONSUMING_SEGMENTS_MATCHED:
-            _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, 
Integer.parseInt(entry.getValue()));
-            break;
-          case SORTED:
-            break;
-          default: {
-            throw new IllegalArgumentException("Unhandled V1 execution stat: " 
+ entry.getKey());
-          }
-        }
-      }
-    }
-  }
-
   public ExplainedNode explain() {
-    Preconditions.checkState(
-        _requests.stream().allMatch(request -> 
request.getQueryContext().getExplain() == ExplainMode.NODE),
-        "All requests must have explain mode set to ExplainMode.NODE");
+    assert _requests.stream().allMatch(request -> 
request.getQueryContext().getExplain() == ExplainMode.NODE) :
+        "All requests must have explain mode set to ExplainMode.NODE";
 
     if (_executionFuture == null) {
       _executionFuture = startExecution();
     }
-
     List<PlanNode> childNodes = new ArrayList<>();
     while (true) {
+      if (_isEarlyTerminated) {
+        terminateAndClearResultsBlocks();
+        break;
+      }
       BaseResultsBlock resultsBlock;
       try {
-        // Here we could use active or passive, given we don't actually 
execute anything
-        long timeout = _context.getPassiveDeadlineMs() - 
System.currentTimeMillis();
-        resultsBlock = _blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
+        // Here we use passive deadline because we end up waiting for the SSE 
operators which can timeout by their own.
+        resultsBlock =
+            _blockingQueue.poll(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
+        terminateAndClearResultsBlocks();
         Thread.currentThread().interrupt();
         throw new RuntimeException("Interrupted while waiting for results 
block", e);
       }
       if (resultsBlock == null) {
+        terminateAndClearResultsBlocks();
         throw new RuntimeException("Timed out waiting for results block");
       }
-      // Terminate when receiving exception block
-      Map<Integer, String> exceptions = _exceptions;
-      if (exceptions != null) {
-        throw new RuntimeException("Received exception block: " + exceptions);
+      // Terminate when there is error block
+      ErrorMseBlock errorBlock = getErrorBlock();
+      if (errorBlock != null) {
+        terminateAndClearResultsBlocks();
+        throw new RuntimeException("Received error block: " + 
errorBlock.getErrorMessages());
       }
-      if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
+      if (resultsBlock == LAST_RESULTS_BLOCK) {
+        _terminated = true;
         break;
-      } else if (!(resultsBlock instanceof ExplainV2ResultBlock)) {
-        throw new IllegalArgumentException("Expected ExplainV2ResultBlock, 
got: " + resultsBlock.getClass().getName());
-      } else {
+      }
+      if (resultsBlock instanceof ExplainV2ResultBlock) {
         ExplainV2ResultBlock block = (ExplainV2ResultBlock) resultsBlock;
         for (ExplainInfo physicalPlan : block.getPhysicalPlans()) {
           childNodes.add(asNode(physicalPlan));
         }
+      } else {
+        terminateAndClearResultsBlocks();
+        throw new IllegalArgumentException("Expected ExplainV2ResultBlock, 
got: " + resultsBlock.getClass().getName());
       }
     }
-    String tableName = _context.getStageMetadata().getTableName();
-    Map<String, Plan.ExplainNode.AttributeValue> attributes;
-    if (tableName == null) { // this should never happen, but let's be 
paranoid to never fail
-      attributes = Collections.emptyMap();
-    } else {
-      attributes =
-          Collections.singletonMap("table", 
Plan.ExplainNode.AttributeValue.newBuilder().setString(tableName).build());
-    }
+    Map<String, Plan.ExplainNode.AttributeValue> attributes =
+        Map.of("table", 
Plan.ExplainNode.AttributeValue.newBuilder().setString(_tableName).build());
     return new ExplainedNode(_context.getStageId(), _dataSchema, null, 
childNodes, "LeafStageCombineOperator",
         attributes);
   }
 
+  @Override
+  protected StatMap<?> copyStatMaps() {
+    return new StatMap<>(_statMap);
+  }
+
+  @Override
+  protected void earlyTerminate() {
+    _isEarlyTerminated = true;
+    cancelSseTasks();
+  }
+
+  @Override
+  public void cancel(Throwable e) {
+    cancelSseTasks();
+  }
+
+  @Override
+  public void close() {
+    cancelSseTasks();
+  }
+
+  @VisibleForTesting
+  void cancelSseTasks() {
+    Future<?> executionFuture = _executionFuture;
+    if (executionFuture != null) {
+      executionFuture.cancel(true);
+    }
+  }
+
+  private synchronized void mergeExecutionStats(Map<String, String> 
executionStats) {

Review Comment:
   The entire method is synchronized, which could create a bottleneck if called 
frequently. Consider using more granular synchronization or concurrent data 
structures to reduce contention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to