klsince commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r941909701


##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -103,7 +115,68 @@ public QueryScheduler(PinotConfiguration config, 
QueryExecutor queryExecutor, Re
    * @return Listenable future for query result representing serialized 
response. It is possible that the
    *    future may return immediately or be scheduled for execution at a later 
time.
    */
-  public abstract ListenableFuture<byte[]> submit(ServerQueryRequest 
queryRequest);
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    ListenableFuture<byte[]> future = submitInternal(queryRequest);
+    if (_enableQueryCancellation) {
+      String queryId = queryRequest.getQueryId();
+      // Track the running query for cancellation.
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Keep track of running query: {}", queryId);
+      }
+      _queryFuturesById.put(queryId, future);
+      // And remove the track when the query ends.
+      Futures.addCallback(future, new FutureCallback<byte[]>() {
+        @Override
+        public void onSuccess(@Nullable byte[] ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on success", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+
+        @Override
+        public void onFailure(Throwable ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on failure", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+      }, MoreExecutors.directExecutor());
+    }
+    return future;
+  }
+
+  protected abstract ListenableFuture<byte[]> 
submitInternal(ServerQueryRequest queryRequest);
+
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
and the query may still run for a while
+   * after calling this method. This method can be called multiple times.
+   *
+   * @param queryId a unique Id to find the query
+   * @return true if a running query exists for the given queryId.
+   */
+  public boolean cancelQuery(String queryId) {
+    // Keep the future as it'll be cleaned up by the thread executing the 
query.
+    Future<byte[]> future = _queryFuturesById.get(queryId);
+    if (future == null) {
+      return false;
+    }
+    boolean done = future.isDone();
+    if (!done) {
+      future.cancel(true);

Review Comment:
   Good question. I think operators either finish their compute quickly or wait 
for sth (like the blockingQueue.poll). The waits can be interrupted. 
   
   A compute operator might miss the isInterrupted flag at the beginning but 
should end itself quickly. Or better refactor it to check the flag more often 
during the compute. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to