Jackie-Jiang commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r944896070


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -141,6 +152,49 @@ public void processSqlQueryPost(String query, @Suspended 
AsyncResponse asyncResp
     }
   }
 
+  @DELETE
+  @Path("query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query as identified by the queryId", notes = 
"No effect if no query exists for the "
+      + "given queryId on the requested broker. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel method can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found on the requested 
broker")
+  })
+  public String cancelQuery(
+      @ApiParam(value = "QueryId as assigned by the broker", required = true) 
@PathParam("queryId") long queryId,
+      @ApiParam(value = "Timeout for servers to respond the cancel request") 
@QueryParam("timeoutMs")
+      @DefaultValue("3000") int timeoutMs,
+      @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("detailed") @DefaultValue("false")

Review Comment:
   (minor) `verbose`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));
+  }
+
+  @VisibleForTesting
+  Set<ServerInstance> getRunningServers(long queryId) {
+    if (_queriesById.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return _queriesById.get(queryId)._servers;

Review Comment:
   Can this throw NPE?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -190,7 +259,17 @@ public BrokerResponseNative handleRequest(JsonNode 
request, @Nullable RequesterI
     if (sql == null) {
       throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
     }
-    return handleRequest(requestId, sql.asText(), request, requesterIdentity, 
requestContext);
+    if (!_enableQueryCancellation) {
+      return handleRequest(requestId, sql.asText(), request, 
requesterIdentity, requestContext);
+    }
+    _queriesById.put(requestId, new QueryRoutingTable(sql.asText()));

Review Comment:
   (minor) We might want to pass in `QueryRoutingTable` in `handleRequest()` to 
avoid extra lookup. It can also avoid NPE if somehow the request is early 
removed (probably not possible right now)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -125,6 +134,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private final int _defaultHllLog2m;
   private final boolean _enableQueryLimitOverride;
   private final boolean _enableDistinctCountBitmapOverride;
+  private final Map<Long, QueryRoutingTable> _queriesById = new 
ConcurrentHashMap<>();

Review Comment:
   (minor) set it to `null` when query cancellation is disabled



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java:
##########
@@ -36,4 +39,19 @@ public interface BrokerRequestHandler {
   BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity 
requesterIdentity,
       RequestContext requestContext)
       throws Exception;
+
+  Map<Long, String> getRunningQueries();
+
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
so the query may still run for a while
+   * after calling this method. This cancel method can be called multiple 
times.
+   * @param queryId the unique Id assigned to the query by the broker
+   * @param timeoutMs timeout to wait for servers to respond the cancel 
requests
+   * @param executor to send cancel requests to servers in parallel
+   * @param connMgr to provide the http connections
+   * @param serverResponses to collect cancel responses from all servers if a 
map is provided
+   * @return true if there is a running query for the given queryId.
+   */
+  boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,

Review Comment:
   Should it throw `TimeoutException` (and maybe `InterruptedException`) when 
the server didn't return within the timeout?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));

Review Comment:
   Check and throw exception when query cancellation is not enabled (with 
proper exception message). Same for other APIs that require enabling query 
cancellation. We don't want to return empty map because that can confuse the 
user.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));
+  }
+
+  @VisibleForTesting
+  Set<ServerInstance> getRunningServers(long queryId) {
+    if (_queriesById.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return _queriesById.get(queryId)._servers;
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses) {
+    QueryRoutingTable routingTable = _queriesById.get(queryId);
+    if (routingTable == null) {
+      return false;
+    }
+    String globalId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>();
+    for (ServerInstance server : routingTable._servers) {
+      serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), 
globalId));
+    }
+    if (serverUrls.isEmpty()) {
+      LOGGER.debug("No servers running the query: {} right now", globalId);
+      return true;
+    }
+    LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, 
serverUrls);
+    CompletionService<DeleteMethod> completionService =

Review Comment:
   There is a corner case not handled:
   Broker added the servers to the `QueryRoutingTable`, but not routed the 
query yet. Now the query cancellation might arrive earlier than the query. To 
the broker, it cannot differentiate this scenario vs query already completed.
   I haven't thought through whether we need to handle it, but this is a thing 
worth notice.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));
+  }
+
+  @VisibleForTesting
+  Set<ServerInstance> getRunningServers(long queryId) {
+    if (_queriesById.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return _queriesById.get(queryId)._servers;
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses) {
+    QueryRoutingTable routingTable = _queriesById.get(queryId);
+    if (routingTable == null) {
+      return false;
+    }
+    String globalId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>();
+    for (ServerInstance server : routingTable._servers) {
+      serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), 
globalId));
+    }
+    if (serverUrls.isEmpty()) {
+      LOGGER.debug("No servers running the query: {} right now", globalId);
+      return true;

Review Comment:
   This usually means the routing table is not calculated. We should put a flag 
in the `QueryRoutingTable` (consider renaming it because it is not really 
routing table) to indicate whether the query is cancelled. We can check that 
before sending the query out.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -167,6 +182,60 @@ private String getDefaultBrokerId() {
     }
   }
 
+  @Override
+  public Map<Long, String> getRunningQueries() {
+    return 
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e 
-> e.getValue()._query));
+  }
+
+  @VisibleForTesting
+  Set<ServerInstance> getRunningServers(long queryId) {
+    if (_queriesById.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return _queriesById.get(queryId)._servers;
+  }
+
+  @Override
+  public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, 
HttpConnectionManager connMgr,
+      Map<String, Integer> serverResponses) {
+    QueryRoutingTable routingTable = _queriesById.get(queryId);
+    if (routingTable == null) {
+      return false;
+    }
+    String globalId = getGlobalQueryId(queryId);
+    List<String> serverUrls = new ArrayList<>();
+    for (ServerInstance server : routingTable._servers) {
+      serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), 
globalId));
+    }
+    if (serverUrls.isEmpty()) {
+      LOGGER.debug("No servers running the query: {} right now", globalId);
+      return true;
+    }
+    LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, 
serverUrls);
+    CompletionService<DeleteMethod> completionService =
+        new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, 
timeoutMs, "DELETE", DeleteMethod::new);
+    for (int i = 0; i < serverUrls.size(); i++) {
+      DeleteMethod deleteMethod = null;
+      try {
+        // Wait for all requests to respond before returning to be sure that 
the servers have handled the cancel
+        // requests. The completion order is different from serverUrls, thus 
use uri in the response.
+        deleteMethod = completionService.take().get();
+        URI uri = deleteMethod.getURI();
+        LOGGER.debug("Got response: {} to cancel query: {} via uri: {}", 
deleteMethod.getStatusCode(), globalId, uri);
+        if (serverResponses != null) {
+          serverResponses.put(uri.getHost(), deleteMethod.getStatusCode());
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to cancel query: {}", globalId, e);

Review Comment:
   This needs to be reflected back to the user. Logging an error is not enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to