jackjlli commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r940773616


##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -103,7 +115,60 @@ public QueryScheduler(PinotConfiguration config, 
QueryExecutor queryExecutor, Re
    * @return Listenable future for query result representing serialized 
response. It is possible that the
    *    future may return immediately or be scheduled for execution at a later 
time.
    */
-  public abstract ListenableFuture<byte[]> submit(ServerQueryRequest 
queryRequest);
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    ListenableFuture<byte[]> future = submitInternal(queryRequest);
+    if (_enableQueryCancellation) {
+      String queryId = queryRequest.getQueryId();
+      // Track the running query for cancellation.
+      LOGGER.debug("Keep track of running query: {}", queryId);
+      _queryFuturesById.put(queryId, future);
+      // And remove the track when the query ends.
+      Futures.addCallback(future, new FutureCallback<byte[]>() {
+        @Override
+        public void onSuccess(@Nullable byte[] ignored) {
+          LOGGER.debug("Remove track of running query: {} on success", 
queryId);

Review Comment:
   Same here and the other places in this class.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -103,7 +115,60 @@ public QueryScheduler(PinotConfiguration config, 
QueryExecutor queryExecutor, Re
    * @return Listenable future for query result representing serialized 
response. It is possible that the
    *    future may return immediately or be scheduled for execution at a later 
time.
    */
-  public abstract ListenableFuture<byte[]> submit(ServerQueryRequest 
queryRequest);
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    ListenableFuture<byte[]> future = submitInternal(queryRequest);
+    if (_enableQueryCancellation) {
+      String queryId = queryRequest.getQueryId();
+      // Track the running query for cancellation.
+      LOGGER.debug("Keep track of running query: {}", queryId);

Review Comment:
   Suggest checking the log level in a if statement instead of directly logging 
the log here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -141,6 +152,50 @@ public void processSqlQueryPost(String query, @Suspended 
AsyncResponse asyncResp
     }
   }
 
+  @DELETE
+  @Path("query/{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a query as identified by the queryId", notes = 
"No effect if no query exists for the "
+      + "given queryId on the requested broker. Query may continue to run for 
a short while after calling cancel as "
+      + "it's done in a non-blocking manner. The cancel method can be called 
multiple times.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error"),
+      @ApiResponse(code = 404, message = "Query not found on the requested 
broker")
+  })
+  public String cancelQuery(
+      @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>", 
required = true) @PathParam("queryId")
+          String queryId,
+      @ApiParam(value = "Timeout for servers to respond the cancel request") 
@QueryParam("timeoutMs")
+      @DefaultValue("3000") int timeoutMs,
+      @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("detailed") @DefaultValue("false")
+          boolean detailed) {
+    Map<String, Integer> serverResponses = null;
+    if (detailed) {
+      serverResponses = new HashMap<>();
+    }
+    if (!_requestHandler.cancelQuery(queryId, timeoutMs, _executor, 
_httpConnMgr, serverResponses)) {
+      throw new WebApplicationException(
+          Response.status(Response.Status.NOT_FOUND).entity("Query: " + 
queryId + " not found on the broker").build());
+    }
+    String resp = "Cancelled query: " + queryId;
+    if (detailed) {
+      resp += " with responses from servers: " + serverResponses;
+    }
+    return resp;
+  }
+
+  @GET
+  @Path("queries")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get queryIds of the running queries submitted via 
this broker", notes = "QueryIds are in the"
+      + " format of <brokerId>_<requestId>")

Review Comment:
   Since the queryId is formed as `<brokerId>_<requestId>`, whichever broker 
receives this request, it will always return its own brokerId as the prefix. 
Does that mean user needs to know about the list of broker ids beforehand? If 
so, what's the purpose of returning the broker id in the response? Wouldn't it 
better to do it in controller endpoint?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to