This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch cancel-request
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit d66cc7e2815939d6ba462e5182a71e6eff8e4c5a
Author: kishoreg <g.kish...@gmail.com>
AuthorDate: Sun Jul 31 23:50:56 2022 -0700

    Initial commit to support cancelling a long running query
---
 .../broker/api/resources/PinotClientRequest.java   | 16 +++++
 .../requesthandler/BaseBrokerRequestHandler.java   | 23 ++++++-
 .../requesthandler/BrokerRequestHandler.java       |  2 +
 .../BrokerRequestHandlerDelegate.java              |  6 +-
 .../apache/pinot/core/operator/BaseOperator.java   |  7 ++
 .../core/operator/combine/BaseCombineOperator.java |  8 +++
 .../apache/pinot/core/util/trace/TraceContext.java | 23 +++++++
 .../pinot/server/api/resources/QueryResource.java  | 80 ++++++++++++++++++++++
 8 files changed, 163 insertions(+), 2 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 9bc1b466c0..376a847256 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -115,6 +115,22 @@ public class PinotClientRequest {
     }
   }
 
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("query/cancel")
+  @ApiOperation(value = "Querying pinot")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public void cancelSqlQuery(@ApiParam(value = "RequestId", required = true) 
@QueryParam("RequestId") long requestId,
+      @Suspended AsyncResponse asyncResponse, @Context 
org.glassfish.grizzly.http.server.Request requestContext) {
+    //todo: should we get the query as well.. probably not needed if the 
broker maintains requestId <> RequestContext for the running queries
+    String response =  _requestHandler.cancelRequest(requestId);
+    asyncResponse.resume(response);
+
+  }
   @POST
   @ManagedAsync
   @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 3f2bd37322..9c91eb8572 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -125,6 +126,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   private final boolean _enableQueryLimitOverride;
   private final boolean _enableDistinctCountBitmapOverride;
 
+
   public BaseBrokerRequestHandler(PinotConfiguration config, 
BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics) {
@@ -189,7 +191,23 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (sql == null) {
       throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
     }
-    return handleRequest(requestId, sql.asText(), request, requesterIdentity, 
requestContext);
+    BrokerResponseNative brokerResponseNative =
+        handleRequest(requestId, sql.asText(), request, requesterIdentity, 
requestContext);
+    return brokerResponseNative;
+  }
+
+  @Override
+  public String cancelRequest(long requestId) {
+    try {
+
+      //we will send the cancel request to all servers.. we can probably 
optimize by asking the user to provide the query again or the list of tables
+      //Assuming this wont be frequently called.. invoking this on all servers 
it not a big overhead
+
+      return "";
+
+    } catch (Exception e) {
+      return "Exception while trying to cancel request: " + requestId + ". " + 
e.getMessage();
+    }
   }
 
   private BrokerResponseNative handleRequest(long requestId, String query, 
JsonNode request,
@@ -1472,6 +1490,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     if (enableTrace) {
       queryOptions.put(Broker.Request.TRACE, "true");
     }
+
+    queryOptions.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
String.valueOf(requestId));
+
     // NOTE: Always set query options because we will put 'timeoutMs' later
     pinotQuery.setQueryOptions(queryOptions);
     if (!queryOptions.isEmpty()) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 93591dee71..75e823ca6c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -36,4 +36,6 @@ public interface BrokerRequestHandler {
   BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity 
requesterIdentity,
       RequestContext requestContext)
       throws Exception;
+
+  String cancelRequest(long requestId);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index 046e60c04f..2fcd849611 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -40,7 +40,6 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
 
   private final boolean _isMultiStageQueryEngineEnabled;
 
-
   public BrokerRequestHandlerDelegate(
       BrokerRequestHandler singleStageBrokerRequestHandler,
       @Nullable MultiStageBrokerRequestHandler multiStageWorkerRequestHandler
@@ -85,4 +84,9 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
     }
     return _singleStageBrokerRequestHandler.handleRequest(request, 
requesterIdentity, requestContext);
   }
+
+  @Override
+  public String cancelRequest(long requestId) {
+    return _singleStageBrokerRequestHandler.cancelRequest(requestId);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index d4c8772150..533613df9c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -36,8 +36,15 @@ public abstract class BaseOperator<T extends Block> 
implements Operator<T> {
       throw new EarlyTerminationException();
     }
     try (InvocationScope ignored = 
Tracing.getTracer().createScope(getClass())) {
+      try {
+        Thread.sleep(100000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
       return getNextBlock();
     }
+
+
   }
 
   // Make it protected because we should always call nextBlock()
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 44fdd7cdbf..832877a26a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.ThreadTimer;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.Tracing;
@@ -73,6 +74,13 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
     //       The parallelism is bounded by the task count.
     _numTasks = CombineOperatorUtils.getNumTasksForQuery(operators.size(), 
queryContext.getMaxExecutionThreads());
     _futures = new Future[_numTasks];
+
+    try {
+      long requestId = 
Long.parseLong(queryContext.getQueryOptions().get("requestId"));
+      TraceContext.register(requestId, queryContext, _futures);
+    } catch (Exception e) {
+      //ignore.. this will only mean that we cannot cancel the query.. this 
can happen if requestId is null or not a long etc
+    }
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
index 4d430434e5..da414fd1fc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
@@ -22,13 +22,17 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -45,9 +49,28 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * request from tracing to prevent resource leak.
  */
 public final class TraceContext {
+
+  static Map<Long, Pair<QueryContext, Future[]>> requestId2FuturesMap = new 
ConcurrentHashMap<>();
+
   private TraceContext() {
   }
 
+  public static void register(long requestId, QueryContext queryContext, 
Future[] futures) {
+    requestId2FuturesMap.put(requestId, Pair.of(queryContext, futures));
+  }
+
+  public static void cancel(long requestId) {
+    Pair<QueryContext, Future[]> queryContextFuturesPair = 
requestId2FuturesMap.get(requestId);
+    if (queryContextFuturesPair.getRight() != null) {
+      // Cancel all ongoing jobs
+      for (Future future : queryContextFuturesPair.getRight()) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    }
+  }
+
   /**
    * Trace represents the logs for a single thread.
    */
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
new file mode 100644
index 0000000000..cd9011d0ac
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * Debug resource for Pinot Server.
+ */
+@Api(tags = "Query", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/query/")
+public class QueryResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("query/cancel")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Cancel a request",
+      notes = "This will cancel all the threads for a specific request")
+  public String getSegmentsDebugInfo(
+      @ApiParam(value = "RequestId to cancel", required = true) 
@PathParam("requestId") long requestId) {
+    TraceContext.cancel(requestId);
+    return "successfully deleted requestId";
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to