klsince commented on code in PR #9171: URL: https://github.com/apache/pinot/pull/9171#discussion_r947268749
########## pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java: ########## @@ -105,6 +117,73 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re */ public abstract ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest); + /** + * Submit a query for execution and track runtime context about the query for things like cancellation. + * @param queryRequest query to schedule for execution + * @return Listenable future for query result representing serialized response. Custom callbacks can be added on + * the future to clean up the runtime context tracked during query execution. + */ + public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest) { + ListenableFuture<byte[]> future = submit(queryRequest); + if (_enableQueryCancellation) { + String queryId = queryRequest.getQueryId(); + // Track the running query for cancellation. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Keep track of running query: {}", queryId); + } + _queryFuturesById.put(queryId, future); + // And remove the track when the query ends. + Futures.addCallback(future, new FutureCallback<byte[]>() { + @Override + public void onSuccess(@Nullable byte[] ignored) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Remove track of running query: {} on success", queryId); + } + _queryFuturesById.remove(queryId); + } + + @Override + public void onFailure(Throwable ignored) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Remove track of running query: {} on failure", queryId); + } + _queryFuturesById.remove(queryId); + } + }, MoreExecutors.directExecutor()); + } + return future; + } + + /** + * Cancel a query as identified by the queryId. This method is non-blocking and the query may still run for a while + * after calling this method. This method can be called multiple times. + * + * @param queryId a unique Id to find the query + * @return true if a running query exists for the given queryId. + */ + public boolean cancelQuery(String queryId) { + // Keep the future as it'll be cleaned up by the thread executing the query. + Future<byte[]> future = _queryFuturesById.get(queryId); + if (future == null) { + return false; + } + boolean done = future.isDone(); + if (!done) { Review Comment: yeah, it's noop. I got the `done` value for the debug info. btw, I see BaseCombineOperator also does such checks while canceling the futures, so followed that convention here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org