Jackie-Jiang commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2373810887


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -96,106 +100,127 @@ public OpChainSchedulerService(ExecutorService 
executorService, int opStatsCache
   }
 
   public void register(OpChain operatorChain) {
+    QueryExecutionContext executionContext = 
QueryThreadContext.get().getExecutionContext();
+    // Check if query is already terminated before acquiring the read lock.
+    checkTermination(operatorChain, executionContext);
     // Acquire read lock for the query to ensure that the query is not 
cancelled while scheduling the operator chain.
     long requestId = operatorChain.getId().getRequestId();
     Lock readLock = getQueryLock(requestId).readLock();
     readLock.lock();
     try {
+      // Check if query is already terminated again after acquiring the read 
lock.
+      checkTermination(operatorChain, executionContext);
       // Do not schedule the operator chain if the query has been cancelled.
       if (_cancelledQueryCache.getIfPresent(requestId) != null) {
         LOGGER.debug("({}): Query has been cancelled", operatorChain);
+        executionContext.terminate(QueryErrorCode.QUERY_CANCELLATION, 
"Cancelled by user");
         throw new QueryCancelledException(
             "Query has been cancelled before op-chain: " + 
operatorChain.getId() + " being scheduled");
       } else {
-        registerInternal(operatorChain);
+        registerInternal(operatorChain, executionContext);
       }
     } finally {
       readLock.unlock();
     }
   }
 
-  private void registerInternal(OpChain operatorChain) {
+  private void checkTermination(OpChain operatorChain, QueryExecutionContext 
executionContext) {
+    TerminationException terminateException = 
executionContext.getTerminateException();
+    if (terminateException != null) {
+      LOGGER.debug("({}): Query has been terminated", operatorChain, 
terminateException);
+      if (terminateException.getErrorCode() == 
QueryErrorCode.QUERY_CANCELLATION) {
+        throw new QueryCancelledException(
+            "Query has been cancelled before op-chain: " + 
operatorChain.getId() + " being scheduled");
+      } else {
+        throw new QueryCancelledException(
+            "Query has been terminated before op-chain: " + 
operatorChain.getId() + " being scheduled: "
+                + terminateException.getErrorCode() + " - " + 
terminateException.getMessage(), terminateException);
+      }
+    }
+  }
+
+  private void registerInternal(OpChain operatorChain, QueryExecutionContext 
executionContext) {
     OpChainId opChainId = operatorChain.getId();
     MultiStageOperator rootOperator = operatorChain.getRoot();
-    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+    _opChainCache.put(opChainId, Pair.of(rootOperator, executionContext));
+
+    // Create a ListenableFutureTask to ensure the opChain is cancelled even 
if the task is not scheduled
+    ListenableFutureTask<Void> listenableFutureTask = 
ListenableFutureTask.create(new TraceRunnable() {

Review Comment:
   IIUC, `CompletableFutures` doesn't interrupt the underlying execution, which 
will waste resource



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to