This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a655d2d0e support to show running queries and cancel query by id 
(#9171)
3a655d2d0e is described below

commit 3a655d2d0e86cf208a1d84e4d7c776fa093dbb2a
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Wed Aug 17 11:29:46 2022 -0700

    support to show running queries and cancel query by id (#9171)
---
 .../broker/api/resources/PinotClientRequest.java   |  63 ++++++++++
 .../broker/broker/BrokerAdminApiApplication.java   |  14 +++
 .../requesthandler/BaseBrokerRequestHandler.java   | 139 ++++++++++++++++++++-
 .../requesthandler/BrokerRequestHandler.java       |  19 +++
 .../BrokerRequestHandlerDelegate.java              |  19 +++
 .../BaseBrokerRequestHandlerTest.java              |  96 ++++++++++++++
 ...{MultiGetRequest.java => MultiHttpRequest.java} |  58 ++++++---
 .../pinot/common/utils/config/InstanceUtils.java   |  26 ++++
 ...tRequestTest.java => MultiHttpRequestTest.java} |   8 +-
 .../pinot/controller/BaseControllerStarter.java    |   3 +-
 .../helix/core/PinotHelixResourceManager.java      |  24 +---
 .../controller/util/CompletionServiceHelper.java   |   4 +-
 .../core/query/request/ServerQueryRequest.java     |   8 ++
 .../pinot/core/query/scheduler/QueryScheduler.java |  86 ++++++++++++-
 .../core/transport/InstanceRequestHandler.java     |   2 +-
 .../pinot/core/transport/ServerInstance.java       |   9 ++
 .../core/query/scheduler/QuerySchedulerTest.java   |  76 +++++++++++
 .../pinot/core/transport/QueryRoutingTest.java     |   2 +-
 .../pinot/server/api/resources/QueryResource.java  | 100 +++++++++++++++
 .../pinot/server/starter/ServerInstance.java       |   4 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 21 files changed, 702 insertions(+), 59 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 5c7ce3299f..e1ccd67b2f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -33,10 +33,14 @@ import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
@@ -46,6 +50,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.common.exception.QueryException;
@@ -84,6 +89,12 @@ public class PinotClientRequest {
   @Inject
   private BrokerMetrics _brokerMetrics;
 
+  @Inject
+  private Executor _executor;
+
+  @Inject
+  private HttpConnectionManager _httpConnMgr;
+
   @GET
   @ManagedAsync
   @Produces(MediaType.APPLICATION_JSON)
@@ -141,6 +152,58 @@ public class PinotClientRequest {
     }
   }
 
+  @DELETE
+  @Path("query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query as identified by the queryId", notes = 
"No effect if no query exists for the "
+      + "given queryId on the requested broker. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel method can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found on the requested 
broker")
+  })
+  public String cancelQuery(
+      @ApiParam(value = "QueryId as assigned by the broker", required = true) 
@PathParam("queryId") long queryId,
+      @ApiParam(value = "Timeout for servers to respond the cancel request") 
@QueryParam("timeoutMs")
+      @DefaultValue("3000") int timeoutMs,
+      @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("verbose") @DefaultValue("false")
+          boolean verbose) {
+    try {
+      Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
+      if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, 
_httpConnMgr, serverResponses)) {
+        String resp = "Cancelled query: " + queryId;
+        if (verbose) {
+          resp += " with responses from servers: " + serverResponses;
+        }
+        return resp;
+      }
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(String.format("Failed to cancel query: %s on the broker due 
to error: %s", queryId, e.getMessage()))
+          .build());
+    }
+    throw new WebApplicationException(
+        
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not 
found on the broker", queryId))
+            .build());
+  }
+
+  @GET
+  @Path("queries")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get queryIds of the running queries submitted via the 
requested broker", notes = "The id is "
+      + "assigned by the requested broker and only unique at the scope of this 
broker")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public Map<Long, String> getRunningQueries() {
+    try {
+      return _requestHandler.getRunningQueries();
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity("Failed to get running queries on the broker due to error: " 
+ e.getMessage()).build());
+    }
+  }
+
   private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
       boolean onlyDql)
       throws Exception {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index fb8c4d6f0a..3978b65891 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -18,11 +18,17 @@
  */
 package org.apache.pinot.broker.broker;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.swagger.jaxrs.config.BeanConfig;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -57,9 +63,17 @@ public class BrokerAdminApiApplication extends 
ResourceConfig {
     if 
(brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY, 
false)) {
       register(ServiceAutoDiscoveryFeature.class);
     }
+    ExecutorService executor =
+        Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
+    MultiThreadedHttpConnectionManager connMgr = new 
MultiThreadedHttpConnectionManager();
+    connMgr.getParams().setConnectionTimeout((int) brokerConf
+        .getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
+            CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
     register(new AbstractBinder() {
       @Override
       protected void configure() {
+        bind(connMgr).to(HttpConnectionManager.class);
+        bind(executor).to(Executor.class);
         bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
         bind(routingManager).to(BrokerRoutingManager.class);
         bind(brokerRequestHandler).to(BrokerRequestHandler.class);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 6afa20c95d..f5dba937ba 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -26,18 +26,26 @@ import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.DeleteMethod;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.broker.api.HttpRequesterIdentity;
 import org.apache.pinot.broker.api.RequesterIdentity;
@@ -47,6 +55,7 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.http.MultiHttpRequest;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -126,6 +135,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private final int _defaultHllLog2m;
   private final boolean _enableQueryLimitOverride;
   private final boolean _enableDistinctCountBitmapOverride;
+  private final Map<Long, QueryServers> _queriesById = new 
ConcurrentHashMap<>();
+  private final boolean _enableQueryCancellation;
 
   public BaseBrokerRequestHandler(PinotConfiguration config, 
BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
@@ -154,9 +165,13 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
     _numDroppedLog = new AtomicInteger(0);
     _numDroppedLogRateLimiter = RateLimiter.create(1.0);
+    _enableQueryCancellation =
+        
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
     LOGGER.info(
-        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log 
length: {}, query log max rate: {}qps",
-        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, 
_queryLogRateLimiter.getRate());
+        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log 
length: {}, query log max rate: {}qps, "
+            + "enabling query cancellation: {}",
+        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, 
_queryLogRateLimiter.getRate(),
+        _enableQueryCancellation);
   }
 
   private String getDefaultBrokerId() {
@@ -168,6 +183,74 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on broker");
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));
+  }
+
+  @VisibleForTesting
+  Set<ServerInstance> getRunningServers(long requestId) {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on broker");
+    QueryServers queryServers = _queriesById.get(requestId);
+    return (queryServers == null) ? Collections.emptySet() : 
queryServers._servers;
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses)
+      throws Exception {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on broker");
+    QueryServers queryServers = _queriesById.get(queryId);
+    if (queryServers == null) {
+      return false;
+    }
+    String globalId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>();
+    for (ServerInstance server : queryServers._servers) {
+      serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), 
globalId));
+    }
+    if (serverUrls.isEmpty()) {
+      LOGGER.debug("No servers running the query: {} right now", globalId);
+      return true;
+    }
+    LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, 
serverUrls);
+    CompletionService<DeleteMethod> completionService =
+        new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, 
timeoutMs, "DELETE", DeleteMethod::new);
+    List<String> errMsgs = new ArrayList<>(serverUrls.size());
+    for (int i = 0; i < serverUrls.size(); i++) {
+      DeleteMethod deleteMethod = null;
+      try {
+        // Wait for all requests to respond before returning to be sure that 
the servers have handled the cancel
+        // requests. The completion order is different from serverUrls, thus 
use uri in the response.
+        deleteMethod = completionService.take().get();
+        URI uri = deleteMethod.getURI();
+        int status = deleteMethod.getStatusCode();
+        // Unexpected server responses are collected and returned as exception.
+        if (status != 200 && status != 404) {
+          throw new Exception(String.format("Unexpected status=%d and 
response='%s' from uri='%s'", status,
+              deleteMethod.getResponseBodyAsString(), uri));
+        }
+        if (serverResponses != null) {
+          serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to cancel query: {}", globalId, e);
+        // Can't just throw exception from here as there is a need to release 
the other connections.
+        // So just collect the error msg to throw them together after the 
for-loop.
+        errMsgs.add(e.getMessage());
+      } finally {
+        if (deleteMethod != null) {
+          deleteMethod.releaseConnection();
+        }
+      }
+    }
+    if (errMsgs.size() > 0) {
+      throw new Exception("Unexpected responses from servers: " + 
StringUtils.join(errMsgs, ","));
+    }
+    return true;
+  }
+
   @Override
   public BrokerResponseNative handleRequest(JsonNode request, @Nullable 
SqlNodeAndOptions sqlNodeAndOptions,
       @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
@@ -191,9 +274,16 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (sql == null) {
       throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
     }
-    String query = sql.asText();
-    requestContext.setQuery(query);
-    return handleRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext);
+    try {
+      String query = sql.asText();
+      requestContext.setQuery(query);
+      return handleRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext);
+    } finally {
+      if (_enableQueryCancellation) {
+        _queriesById.remove(requestId);
+        LOGGER.debug("Remove track of running query: {}", requestId);
+      }
+    }
   }
 
   private BrokerResponseNative handleRequest(long requestId, String query,
@@ -576,6 +666,19 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         realtimeRoutingTable = null;
       }
     }
+    if (_enableQueryCancellation) {
+      // Start to track the running query for cancellation just before sending 
it out to servers to avoid any potential
+      // failures that could happen before sending it out, like failures to 
calculate the routing table etc.
+      // TODO: Even tracking the query as late as here, a potential race 
condition between calling cancel API and
+      //       query being sent out to servers can still happen. If cancel 
request arrives earlier than query being
+      //       sent out to servers, the servers miss the cancel request and 
continue to run the queries. The users
+      //       can always list the running queries and cancel query again 
until it ends. Just that such race
+      //       condition makes cancel API less reliable. This should be rare 
as it assumes sending queries out to
+      //       servers takes time, but will address later if needed.
+      QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k -> 
new QueryServers(query));
+      LOGGER.debug("Keep track of running query: {}", requestId);
+      queryServers.addServers(offlineRoutingTable, realtimeRoutingTable);
+    }
     // TODO: Modify processBrokerRequest() to directly take PinotQuery
     BrokerResponseNative brokerResponse =
         processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, 
offlineBrokerRequest, offlineRoutingTable,
@@ -1650,6 +1753,10 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
     statistics.setNumRowsResultSet(response.getNumRowsResultSet());
   }
 
+  private String getGlobalQueryId(long requestId) {
+    return _brokerId + "_" + requestId;
+  }
+
   /**
    * Helper class to pass the per server statistics.
    */
@@ -1664,4 +1771,26 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
       _serverStats = serverStats;
     }
   }
+
+  /**
+   * Helper class to track the query plaintext and the requested servers.
+   */
+  private static class QueryServers {
+    private final String _query;
+    private final Set<ServerInstance> _servers = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+    public QueryServers(String query) {
+      _query = query;
+    }
+
+    public void addServers(Map<ServerInstance, List<String>> 
offlineRoutingTable,
+        Map<ServerInstance, List<String>> realtimeRoutingTable) {
+      if (offlineRoutingTable != null) {
+        _servers.addAll(offlineRoutingTable.keySet());
+      }
+      if (realtimeRoutingTable != null) {
+        _servers.addAll(realtimeRoutingTable.keySet());
+      }
+    }
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 799bfe8295..de42134a6a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -19,8 +19,11 @@
 package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Map;
+import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.spi.trace.RequestContext;
@@ -43,4 +46,20 @@ public interface BrokerRequestHandler {
       throws Exception {
     return handleRequest(request, null, requesterIdentity, requestContext);
   }
+
+  Map<Long, String> getRunningQueries();
+
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
so the query may still run for a while
+   * after calling this method. This cancel method can be called multiple 
times.
+   * @param queryId the unique Id assigned to the query by the broker
+   * @param timeoutMs timeout to wait for servers to respond the cancel 
requests
+   * @param executor to send cancel requests to servers in parallel
+   * @param connMgr to provide the http connections
+   * @param serverResponses to collect cancel responses from all servers if a 
map is provided
+   * @return true if there is a running query for the given queryId.
+   */
+  boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses)
+      throws Exception;
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index ba22f3f481..7a2a085273 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -20,7 +20,9 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
+import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -116,4 +118,21 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
     }
     return false;
   }
+
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    // TODO: add support for multiStaged engine: track running queries for 
multiStaged engine and combine its
+    //       running queries with those from singleStaged engine. Both engines 
share the same request Id generator, so
+    //       the query will have unique ids across the two engines.
+    return _singleStageBrokerRequestHandler.getRunningQueries();
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses)
+      throws Exception {
+    // TODO: add support for multiStaged engine, basically try to cancel the 
query on multiStaged engine firstly; if
+    //       not found, try on the singleStaged engine.
+    return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, 
executor, connMgr, serverResponses);
+  }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index f4e09259ff..cff4eff193 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -18,20 +18,46 @@
  */
 package org.apache.pinot.broker.requesthandler;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 
@@ -156,4 +182,74 @@ public class BaseBrokerRequestHandlerTest {
     
Assert.assertEquals(BaseBrokerRequestHandler.getActualTableName("db.namespace.mytable",
 tableCache),
         "db.namespace.mytable");
   }
+
+  @Test
+  public void testCancelQuery()
+      throws Exception {
+    String tableName = "myTable_OFFLINE";
+    // Mock pretty much everything until the query can be submitted.
+    TableCache tableCache = mock(TableCache.class);
+    TableConfig tableCfg = mock(TableConfig.class);
+    when(tableCache.getActualTableName(anyString())).thenReturn(tableName);
+    TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null);
+    when(tableCfg.getTenantConfig()).thenReturn(tenant);
+    when(tableCache.getTableConfig(anyString())).thenReturn(tableCfg);
+    BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
+    when(routingManager.routingExists(anyString())).thenReturn(true);
+    RoutingTable rt = mock(RoutingTable.class);
+    when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
+        .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), 
Collections.singletonList("segment01")));
+    when(routingManager.getRoutingTable(any())).thenReturn(rt);
+    QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
+    when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+    CountDownLatch latch = new CountDownLatch(1);
+    PinotConfiguration config =
+        new 
PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation",
 "true"));
+    BaseBrokerRequestHandler requestHandler =
+        new BaseBrokerRequestHandler(config, routingManager, new 
AllowAllAccessControlFactory(),
+            queryQuotaManager, tableCache,
+            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), 
true, Collections.emptySet())) {
+          @Override
+          public void start() {
+          }
+
+          @Override
+          public void shutDown() {
+          }
+
+          @Override
+          protected BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
+              BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest,
+              @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+              @Nullable BrokerRequest realtimeBrokerRequest,
+              @Nullable Map<ServerInstance, List<String>> 
realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
+              RequestContext requestContext)
+              throws Exception {
+            latch.await();
+            return null;
+          }
+        };
+    CompletableFuture.runAsync(() -> {
+      try {
+        JsonNode request = JsonUtils.stringToJsonNode(
+            String.format("{\"sql\":\"select * from %s limit 
10\",\"queryOptions\":\"timeoutMs=10000\"}", tableName));
+        RequestContext requestStats = Tracing.getTracer().createRequestScope();
+        requestHandler.handleRequest(request, null, requestStats);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    TestUtils.waitForCondition((aVoid) -> 
requestHandler.getRunningServers(1).size() == 1, 500, 5000,
+        "Failed to submit query");
+    Map.Entry<Long, String> entry = 
requestHandler.getRunningQueries().entrySet().iterator().next();
+    Assert.assertEquals(entry.getKey().longValue(), 1);
+    Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE 
limit 10"));
+    Set<ServerInstance> servers = requestHandler.getRunningServers(1);
+    Assert.assertEquals(servers.size(), 1);
+    Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
+    Assert.assertEquals(servers.iterator().next().getPort(), 9000);
+    Assert.assertEquals(servers.iterator().next().getInstanceId(), 
"server01_9000");
+    Assert.assertEquals(servers.iterator().next().getAdminEndpoint(), 
"http://server01:8097";);
+    latch.countDown();
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java 
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
similarity index 60%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
index 9f6e3f3158..f66c51d2e1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
@@ -23,9 +23,11 @@ import java.util.Map;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.HttpMethodBase;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.params.HttpClientParams;
 import org.slf4j.Logger;
@@ -33,23 +35,21 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Class to support multiple http GET operations in parallel by using
- * the executor that is passed in.
+ * Class to support multiple http operations in parallel by using the executor 
that is passed in. This is a wrapper
+ * around Apache common HTTP client.
  *
- * This is a wrapper around Apache common HTTP client.
+ * The execute method is re-usable but there is no real benefit to it. All the 
connection management is handled by
+ * the input HttpConnectionManager. Note that we cannot use 
SimpleHttpConnectionManager as it is not thread safe. Use
+ * MultiThreadedHttpConnectionManager as shown in the example below. As GET is 
commonly used, there is a dedicated
+ * execute method for it. Other http methods like DELETE can use the generic 
version of execute method.
  *
- * The execute method is re-usable but there is no real benefit to it. All
- * the connection management is handled by the input HttpConnectionManager.
- * Note that we cannot use SimpleHttpConnectionManager as it is not thread
- * safe. Use MultiThreadedHttpConnectionManager as shown in the example
- * below
  * Usage:
  * <pre>
  * {@code
  *    List<String> urls = Arrays.asList("http://www.linkedin.com";, 
"http://www.google.com";);
- *    MultiGetRequest mget = new 
MultiGetRequest(Executors.newCachedThreadPool(),
+ *    MultiHttpRequest mhr = new 
MultiHttpRequest(Executors.newCachedThreadPool(),
  *           new MultiThreadedHttpConnectionManager());
- *    CompletionService<GetMethod> completionService = mget.execute(urls);
+ *    CompletionService<GetMethod> completionService = mhr.execute(urls, 
headers, timeoutMs);
  *    for (int i = 0; i < urls.size(); i++) {
  *      GetMethod getMethod = null;
  *      try {
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  */
-public class MultiGetRequest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiGetRequest.class);
+public class MultiHttpRequest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiHttpRequest.class);
 
   private final Executor _executor;
   // TODO: Verify that _connectionManager is an instaceOf 
MultithreadedHttpConnectionManager.
@@ -84,7 +84,7 @@ public class MultiGetRequest {
    * @param executor executor service to use for making parallel requests
    * @param connectionManager http connection manager to use.
    */
-  public MultiGetRequest(Executor executor, HttpConnectionManager 
connectionManager) {
+  public MultiHttpRequest(Executor executor, HttpConnectionManager 
connectionManager) {
     _executor = executor;
     _connectionManager = connectionManager;
   }
@@ -99,24 +99,42 @@ public class MultiGetRequest {
    */
   public CompletionService<GetMethod> execute(List<String> urls, @Nullable 
Map<String, String> requestHeaders,
       int timeoutMs) {
+    return execute(urls, requestHeaders, timeoutMs, "GET", GetMethod::new);
+  }
+
+  /**
+   * Execute certain http method on the urls in parallel using the executor 
service.
+   * @param urls absolute URLs to execute the http method
+   * @param requestHeaders headers to set when making the request
+   * @param timeoutMs timeout in milliseconds for each http request
+   * @param httpMethodName the name of the http method like GET, DELETE etc.
+   * @param httpMethodSupplier a function to create a new http method object.
+   * @return instance of CompletionService. Completion service will provide
+   *   results as they arrive. The order is NOT same as the order of URLs
+   */
+  public <T extends HttpMethodBase> CompletionService<T> execute(List<String> 
urls,
+      @Nullable Map<String, String> requestHeaders, int timeoutMs, String 
httpMethodName,
+      Function<String, T> httpMethodSupplier) {
     HttpClientParams clientParams = new HttpClientParams();
     clientParams.setConnectionManagerTimeout(timeoutMs);
     HttpClient client = new HttpClient(clientParams, _connectionManager);
 
-    CompletionService<GetMethod> completionService = new 
ExecutorCompletionService<>(_executor);
+    CompletionService<T> completionService = new 
ExecutorCompletionService<>(_executor);
     for (String url : urls) {
       completionService.submit(() -> {
         try {
-          GetMethod getMethod = new GetMethod(url);
+          T httpMethod = httpMethodSupplier.apply(url);
+          // Explicitly cast type downwards to workaround a bug in jdk8: 
https://bugs.openjdk.org/browse/JDK-8056984
+          HttpMethodBase httpMethodBase = httpMethod;
           if (requestHeaders != null) {
-            requestHeaders.forEach(getMethod::setRequestHeader);
+            requestHeaders.forEach((k, v) -> 
httpMethodBase.setRequestHeader(k, v));
           }
-          getMethod.getParams().setSoTimeout(timeoutMs);
-          client.executeMethod(getMethod);
-          return getMethod;
+          httpMethodBase.getParams().setSoTimeout(timeoutMs);
+          client.executeMethod(httpMethodBase);
+          return httpMethod;
         } catch (Exception e) {
           // Log only exception type and message instead of the whole stack 
trace
-          LOGGER.warn("Caught '{}' while executing GET on URL: {}", 
e.toString(), url);
+          LOGGER.warn("Caught '{}' while executing: {} on URL: {}", e, 
httpMethodName, url);
           throw e;
         }
       });
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
index 41e88b5cf0..c840674390 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 
 
@@ -59,6 +60,31 @@ public class InstanceUtils {
     return prefix + instance.getHost() + "_" + instance.getPort();
   }
 
+  public static String getServerAdminEndpoint(InstanceConfig instanceConfig) {
+    // Backward-compatible with legacy hostname of format 'Server_<hostname>'
+    String hostname = instanceConfig.getHostName();
+    if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+      hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
+    }
+    return getServerAdminEndpoint(instanceConfig, hostname, 
CommonConstants.HTTP_PROTOCOL);
+  }
+
+  public static String getServerAdminEndpoint(InstanceConfig instanceConfig, 
String hostname, String defaultProtocol) {
+    String protocol = defaultProtocol;
+    int port = CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+    int adminPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1);
+    int adminHttpsPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1);
+    // NOTE: preference for insecure is sub-optimal, but required for 
incremental upgrade scenarios
+    if (adminPort > 0) {
+      protocol = CommonConstants.HTTP_PROTOCOL;
+      port = adminPort;
+    } else if (adminHttpsPort > 0) {
+      protocol = CommonConstants.HTTPS_PROTOCOL;
+      port = adminHttpsPort;
+    }
+    return String.format("%s://%s:%d", protocol, hostname, port);
+  }
+
   /**
    * Returns the Helix InstanceConfig for the given instance.
    */
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
similarity index 96%
rename from 
pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
rename to 
pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
index 8cfd5a4cfe..95247eca13 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
@@ -42,8 +42,8 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 
-public class MultiGetRequestTest {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiGetRequest.class);
+public class MultiHttpRequestTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiHttpRequest.class);
   private static final String SUCCESS_MSG = "success";
   private static final String ERROR_MSG = "error";
   private static final String TIMEOUT_MSG = "Timeout";
@@ -106,8 +106,8 @@ public class MultiGetRequestTest {
 
   @Test
   public void testMultiGet() {
-    MultiGetRequest mget =
-        new MultiGetRequest(Executors.newCachedThreadPool(), new 
MultiThreadedHttpConnectionManager());
+    MultiHttpRequest mget =
+        new MultiHttpRequest(Executors.newCachedThreadPool(), new 
MultiThreadedHttpConnectionManager());
     List<String> urls = Arrays.asList("http://localhost:"; + 
String.valueOf(_portStart) + URI_PATH,
         "http://localhost:"; + String.valueOf(_portStart + 1) + URI_PATH,
         "http://localhost:"; + String.valueOf(_portStart + 2) + URI_PATH,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index b52e86197d..9f4a35eb3c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -207,8 +207,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // Do not use this before the invocation of {@link 
PinotHelixResourceManager::start()}, which happens in {@link
       // ControllerStarter::start()}
       _helixResourceManager = new PinotHelixResourceManager(_config);
+      // This executor service is used to do async tasks from multiget util or 
table rebalancing.
       _executorService =
-          Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+          Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
     }
 
     // Initialize the table config tuner registry.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4eba9a8d5b..9d061d290a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -158,7 +158,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -226,28 +225,7 @@ public class PinotHelixResourceManager {
               public String load(String instanceId) {
                 InstanceConfig instanceConfig = 
getHelixInstanceConfig(instanceId);
                 Preconditions.checkNotNull(instanceConfig, "Failed to find 
instance config for: %s", instanceId);
-                // Backward-compatible with legacy hostname of format 
'Server_<hostname>'
-                String hostname = instanceConfig.getHostName();
-                if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
-                  hostname = 
hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
-                }
-
-                String protocol = CommonConstants.HTTP_PROTOCOL;
-                int port = Server.DEFAULT_ADMIN_API_PORT;
-
-                int adminPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1);
-                int adminHttpsPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1);
-
-                // NOTE: preference for insecure is sub-optimal, but required 
for incremental upgrade scenarios
-                if (adminPort > 0) {
-                  protocol = CommonConstants.HTTP_PROTOCOL;
-                  port = adminPort;
-                } else if (adminHttpsPort > 0) {
-                  protocol = CommonConstants.HTTPS_PROTOCOL;
-                  port = adminHttpsPort;
-                }
-
-                return String.format("%s://%s:%d", protocol, hostname, port);
+                return InstanceUtils.getServerAdminEndpoint(instanceConfig);
               }
             });
     _tableUpdaterLocks = new Object[DEFAULT_TABLE_UPDATER_LOCKERS_SIZE];
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index 4a1bbb2623..8ec3b28238 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.URI;
 import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.http.MultiHttpRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +79,7 @@ public class CompletionServiceHelper {
 
     // TODO: use some service other than completion service so that we know 
which server encounters the error
     CompletionService<GetMethod> completionService =
-        new MultiGetRequest(_executor, 
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
+        new MultiHttpRequest(_executor, 
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
     for (int i = 0; i < serverURLs.size(); i++) {
       GetMethod getMethod = null;
       try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 09bfa4ad2f..c272c7ffed 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -87,6 +87,14 @@ public class ServerQueryRequest {
     _timerContext = new TimerContext(_queryContext.getTableName(), 
serverMetrics, queryArrivalTimeMs);
   }
 
+  /**
+   * As _requestId can be same across brokers, so use _brokerId and _requestId 
together to uniquely identify a query.
+   * @return unique query Id within a pinot cluster.
+   */
+  public String getQueryId() {
+    return _brokerId + "_" + _requestId;
+  }
+
   public long getRequestId() {
     return _requestId;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 069a84337c..f55cdb35f9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -19,12 +19,18 @@
 package org.apache.pinot.core.query.scheduler;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
@@ -62,6 +68,7 @@ public abstract class QueryScheduler {
   private static final String INVALID_NUM_RESIZES = "-1";
   private static final String INVALID_RESIZE_TIME_MS = "-1";
   private static final String QUERY_LOG_MAX_RATE_KEY = 
"query.log.maxRatePerSecond";
+  private static final String ENABLE_QUERY_CANCELLATION_KEY = 
"enable.query.cancellation";
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
   protected final ServerMetrics _serverMetrics;
   protected final QueryExecutor _queryExecutor;
@@ -70,8 +77,9 @@ public abstract class QueryScheduler {
   private final RateLimiter _queryLogRateLimiter;
   private final RateLimiter _numDroppedLogRateLimiter;
   private final AtomicInteger _numDroppedLogCounter;
+  private final boolean _enableQueryCancellation;
   protected volatile boolean _isRunning = false;
-
+  private final Map<String, Future<byte[]>> _queryFuturesById = new 
ConcurrentHashMap<>();
   /**
    * Constructor to initialize QueryScheduler
    * @param queryExecutor QueryExecutor engine to use
@@ -93,8 +101,12 @@ public abstract class QueryScheduler {
     _queryLogRateLimiter = 
RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY, 
DEFAULT_QUERY_LOG_MAX_RATE));
     _numDroppedLogRateLimiter = RateLimiter.create(1.0d);
     _numDroppedLogCounter = new AtomicInteger(0);
-
     LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
+
+    _enableQueryCancellation = 
Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY));
+    if (_enableQueryCancellation) {
+      LOGGER.info("Enable query cancellation");
+    }
   }
 
   /**
@@ -105,6 +117,76 @@ public abstract class QueryScheduler {
    */
   public abstract ListenableFuture<byte[]> submit(ServerQueryRequest 
queryRequest);
 
+  /**
+   * Submit a query for execution and track runtime context about the query 
for things like cancellation.
+   * @param queryRequest query to schedule for execution
+   * @return Listenable future for query result representing serialized 
response. Custom callbacks can be added on
+   * the future to clean up the runtime context tracked during query execution.
+   */
+  public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest) 
{
+    ListenableFuture<byte[]> future = submit(queryRequest);
+    if (_enableQueryCancellation) {
+      String queryId = queryRequest.getQueryId();
+      // Track the running query for cancellation.
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Keep track of running query: {}", queryId);
+      }
+      _queryFuturesById.put(queryId, future);
+      // And remove the track when the query ends.
+      Futures.addCallback(future, new FutureCallback<byte[]>() {
+        @Override
+        public void onSuccess(@Nullable byte[] ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on success", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+
+        @Override
+        public void onFailure(Throwable ignored) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on failure", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
+      }, MoreExecutors.directExecutor());
+    }
+    return future;
+  }
+
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
and the query may still run for a while
+   * after calling this method. This method can be called multiple times.
+   * TODO: refine the errmsg when query is cancelled, instead of bubbling up 
the executor's CancellationException.
+   *
+   * @param queryId a unique Id to find the query
+   * @return true if a running query exists for the given queryId.
+   */
+  public boolean cancelQuery(String queryId) {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on server");
+    // Keep the future as it'll be cleaned up by the thread executing the 
query.
+    Future<byte[]> future = _queryFuturesById.get(queryId);
+    if (future == null) {
+      return false;
+    }
+    boolean done = future.isDone();
+    if (!done) {
+      future.cancel(true);
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Cancelled query: {} that's done: {}", queryId, done);
+    }
+    return true;
+  }
+
+  /**
+   * @return list of ids of the queries currently running on the server.
+   */
+  public Set<String> getRunningQueryIds() {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on server");
+    return new HashSet<>(_queryFuturesById.keySet());
+  }
+
   /**
    * Query scheduler name for logging
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index af841eea5b..31d0c8d536 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -120,7 +120,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       tableNameWithType = queryRequest.getTableNameWithType();
 
       // Submit query for execution and register callback for execution 
results.
-      Futures.addCallback(_queryScheduler.submit(queryRequest),
+      Futures.addCallback(_queryScheduler.submitQuery(queryRequest),
           createCallback(ctx, tableNameWithType, queryArrivalTimeMs, 
instanceRequest, queryRequest),
           MoreExecutors.directExecutor());
     } catch (Exception e) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
index 149c1b0559..8809e4a015 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
@@ -22,7 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 
 
@@ -43,6 +45,7 @@ public class ServerInstance {
 
   private final int _queryServicePort;
   private final int _queryMailboxPort;
+  private final String _adminEndpoint;
 
   /**
    * By default (auto joined instances), server instance name is of format: 
{@code Server_<hostname>_<port>}, e.g.
@@ -75,6 +78,7 @@ public class ServerInstance {
         INVALID_PORT);
     _queryMailboxPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
         INVALID_PORT);
+    _adminEndpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig, 
_hostname, CommonConstants.HTTP_PROTOCOL);
   }
 
   @VisibleForTesting
@@ -86,6 +90,7 @@ public class ServerInstance {
     _nettyTlsPort = INVALID_PORT;
     _queryServicePort = INVALID_PORT;
     _queryMailboxPort = INVALID_PORT;
+    _adminEndpoint = null;
   }
 
   public String getInstanceId() {
@@ -100,6 +105,10 @@ public class ServerInstance {
     return _port;
   }
 
+  public String getAdminEndpoint() {
+    return _adminEndpoint;
+  }
+
   public int getGrpcPort() {
     return _grpcPort;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
new file mode 100644
index 0000000000..b2bf14a1f2
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class QuerySchedulerTest {
+  @Test
+  public void testCancelQuery() {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty("enable.query.cancellation", "true");
+    QueryScheduler qs = createQueryScheduler(config);
+    Set<String> queryIds = new HashSet<>();
+    queryIds.add("foo");
+    queryIds.add("bar");
+    queryIds.add("baz");
+    for (String id : queryIds) {
+      ServerQueryRequest query = mock(ServerQueryRequest.class);
+      when(query.getQueryId()).thenReturn(id);
+      qs.submitQuery(query);
+    }
+    Assert.assertEquals(qs.getRunningQueryIds(), queryIds);
+    for (String id : queryIds) {
+      qs.cancelQuery(id);
+    }
+    Assert.assertTrue(qs.getRunningQueryIds().isEmpty());
+    Assert.assertFalse(qs.cancelQuery("unknown"));
+  }
+
+  private QueryScheduler createQueryScheduler(PinotConfiguration config) {
+    return new QueryScheduler(config, mock(QueryExecutor.class), 
mock(ResourceManager.class), mock(ServerMetrics.class),
+        new LongAccumulator(Long::max, 0)) {
+      @Override
+      public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+        // Create a FutureTask does nothing but waits to be cancelled and 
trigger callbacks.
+        return ListenableFutureTask.create(() -> null);
+      }
+
+      @Override
+      public String name() {
+        return "noop";
+      }
+    };
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 01191ce79d..618223cf39 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -70,7 +70,7 @@ public class QueryRoutingTest {
 
   private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] 
responseBytes) {
     QueryScheduler queryScheduler = mock(QueryScheduler.class);
-    when(queryScheduler.submit(any())).thenAnswer(invocation -> {
+    when(queryScheduler.submitQuery(any())).thenAnswer(invocation -> {
       Thread.sleep(responseDelayMs);
       return Futures.immediateFuture(responseBytes);
     });
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
new file mode 100644
index 0000000000..b6c83ca94b
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * API to cancel query running on the server, given a queryId.
+ */
+@Api(tags = "Query", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class QueryResource {
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @DELETE
+  @Path("/query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query running on the server as identified by 
the queryId", notes = "No effect if "
+      + "no query exists for the given queryId. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel API can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found running on the 
server")
+  })
+  public String cancelQuery(
+      @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>", 
required = true) @PathParam("queryId")
+          String queryId) {
+    try {
+      if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
+        return "Cancelled query: " + queryId;
+      }
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity(String.format("Failed to cancel query: %s on the server due 
to error: %s", queryId, e.getMessage()))
+          .build());
+    }
+    throw new WebApplicationException(
+        
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not 
found on the server", queryId))
+            .build());
+  }
+
+  @GET
+  @Path("/queries/id")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get queryIds of running queries on the server", notes 
= "QueryIds are in the format of "
+      + "<brokerId>_<requestId>")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public Set<String> getRunningQueryIds() {
+    try {
+      return _serverInstance.getQueryScheduler().getRunningQueryIds();
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+          .entity("Failed to get queryIds of running queries on the server due 
to error: " + e.getMessage()).build());
+    }
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 7ed0a57989..eff4a1cc05 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -258,4 +258,8 @@ public class ServerInstance {
   public long getLatestQueryTime() {
     return _latestQueryTime.get();
   }
+
+  public QueryScheduler getQueryScheduler() {
+    return _queryScheduler;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c24702b0dc..ec2719b47e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -205,6 +205,7 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = 
Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
         "pinot.broker.query.log.maxRatePerSecond";
+    public static final String CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION = 
"pinot.broker.enable.query.cancellation";
     public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 
10_000d;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = 
"pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to