Jackie-Jiang commented on code in PR #9171: URL: https://github.com/apache/pinot/pull/9171#discussion_r944896070
########## pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java: ########## @@ -141,6 +152,49 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp } } + @DELETE + @Path("query/{queryId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the " + + "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as " + + "it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on the requested broker") + }) + public String cancelQuery( + @ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("detailed") @DefaultValue("false") Review Comment: (minor) `verbose` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -167,6 +182,60 @@ private String getDefaultBrokerId() { } } + @Override + public Map<Long, String> getRunningQueries() { + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); + } + + @VisibleForTesting + Set<ServerInstance> getRunningServers(long queryId) { + if (_queriesById.isEmpty()) { + return Collections.emptySet(); + } + return _queriesById.get(queryId)._servers; Review Comment: Can this throw NPE? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -190,7 +259,17 @@ public BrokerResponseNative handleRequest(JsonNode request, @Nullable RequesterI if (sql == null) { throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } - return handleRequest(requestId, sql.asText(), request, requesterIdentity, requestContext); + if (!_enableQueryCancellation) { + return handleRequest(requestId, sql.asText(), request, requesterIdentity, requestContext); + } + _queriesById.put(requestId, new QueryRoutingTable(sql.asText())); Review Comment: (minor) We might want to pass in `QueryRoutingTable` in `handleRequest()` to avoid extra lookup. It can also avoid NPE if somehow the request is early removed (probably not possible right now) ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -125,6 +134,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final int _defaultHllLog2m; private final boolean _enableQueryLimitOverride; private final boolean _enableDistinctCountBitmapOverride; + private final Map<Long, QueryRoutingTable> _queriesById = new ConcurrentHashMap<>(); Review Comment: (minor) set it to `null` when query cancellation is disabled ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java: ########## @@ -36,4 +39,19 @@ public interface BrokerRequestHandler { BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception; + + Map<Long, String> getRunningQueries(); + + /** + * Cancel a query as identified by the queryId. This method is non-blocking so the query may still run for a while + * after calling this method. This cancel method can be called multiple times. + * @param queryId the unique Id assigned to the query by the broker + * @param timeoutMs timeout to wait for servers to respond the cancel requests + * @param executor to send cancel requests to servers in parallel + * @param connMgr to provide the http connections + * @param serverResponses to collect cancel responses from all servers if a map is provided + * @return true if there is a running query for the given queryId. + */ + boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, Review Comment: Should it throw `TimeoutException` (and maybe `InterruptedException`) when the server didn't return within the timeout? ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -167,6 +182,60 @@ private String getDefaultBrokerId() { } } + @Override + public Map<Long, String> getRunningQueries() { + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); Review Comment: Check and throw exception when query cancellation is not enabled (with proper exception message). Same for other APIs that require enabling query cancellation. We don't want to return empty map because that can confuse the user. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -167,6 +182,60 @@ private String getDefaultBrokerId() { } } + @Override + public Map<Long, String> getRunningQueries() { + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); + } + + @VisibleForTesting + Set<ServerInstance> getRunningServers(long queryId) { + if (_queriesById.isEmpty()) { + return Collections.emptySet(); + } + return _queriesById.get(queryId)._servers; + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map<String, Integer> serverResponses) { + QueryRoutingTable routingTable = _queriesById.get(queryId); + if (routingTable == null) { + return false; + } + String globalId = getGlobalQueryId(queryId); + List<String> serverUrls = new ArrayList<>(); + for (ServerInstance server : routingTable._servers) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId)); + } + if (serverUrls.isEmpty()) { + LOGGER.debug("No servers running the query: {} right now", globalId); + return true; + } + LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls); + CompletionService<DeleteMethod> completionService = Review Comment: There is a corner case not handled: Broker added the servers to the `QueryRoutingTable`, but not routed the query yet. Now the query cancellation might arrive earlier than the query. To the broker, it cannot differentiate this scenario vs query already completed. I haven't thought through whether we need to handle it, but this is a thing worth notice. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -167,6 +182,60 @@ private String getDefaultBrokerId() { } } + @Override + public Map<Long, String> getRunningQueries() { + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); + } + + @VisibleForTesting + Set<ServerInstance> getRunningServers(long queryId) { + if (_queriesById.isEmpty()) { + return Collections.emptySet(); + } + return _queriesById.get(queryId)._servers; + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map<String, Integer> serverResponses) { + QueryRoutingTable routingTable = _queriesById.get(queryId); + if (routingTable == null) { + return false; + } + String globalId = getGlobalQueryId(queryId); + List<String> serverUrls = new ArrayList<>(); + for (ServerInstance server : routingTable._servers) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId)); + } + if (serverUrls.isEmpty()) { + LOGGER.debug("No servers running the query: {} right now", globalId); + return true; Review Comment: This usually means the routing table is not calculated. We should put a flag in the `QueryRoutingTable` (consider renaming it because it is not really routing table) to indicate whether the query is cancelled. We can check that before sending the query out. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -167,6 +182,60 @@ private String getDefaultBrokerId() { } } + @Override + public Map<Long, String> getRunningQueries() { + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); + } + + @VisibleForTesting + Set<ServerInstance> getRunningServers(long queryId) { + if (_queriesById.isEmpty()) { + return Collections.emptySet(); + } + return _queriesById.get(queryId)._servers; + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map<String, Integer> serverResponses) { + QueryRoutingTable routingTable = _queriesById.get(queryId); + if (routingTable == null) { + return false; + } + String globalId = getGlobalQueryId(queryId); + List<String> serverUrls = new ArrayList<>(); + for (ServerInstance server : routingTable._servers) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId)); + } + if (serverUrls.isEmpty()) { + LOGGER.debug("No servers running the query: {} right now", globalId); + return true; + } + LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls); + CompletionService<DeleteMethod> completionService = + new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", DeleteMethod::new); + for (int i = 0; i < serverUrls.size(); i++) { + DeleteMethod deleteMethod = null; + try { + // Wait for all requests to respond before returning to be sure that the servers have handled the cancel + // requests. The completion order is different from serverUrls, thus use uri in the response. + deleteMethod = completionService.take().get(); + URI uri = deleteMethod.getURI(); + LOGGER.debug("Got response: {} to cancel query: {} via uri: {}", deleteMethod.getStatusCode(), globalId, uri); + if (serverResponses != null) { + serverResponses.put(uri.getHost(), deleteMethod.getStatusCode()); + } + } catch (Exception e) { + LOGGER.error("Failed to cancel query: {}", globalId, e); Review Comment: This needs to be reflected back to the user. Logging an error is not enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org