This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch cancel-request in repository https://gitbox.apache.org/repos/asf/pinot.git
commit d66cc7e2815939d6ba462e5182a71e6eff8e4c5a Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sun Jul 31 23:50:56 2022 -0700 Initial commit to support cancelling a long running query --- .../broker/api/resources/PinotClientRequest.java | 16 +++++ .../requesthandler/BaseBrokerRequestHandler.java | 23 ++++++- .../requesthandler/BrokerRequestHandler.java | 2 + .../BrokerRequestHandlerDelegate.java | 6 +- .../apache/pinot/core/operator/BaseOperator.java | 7 ++ .../core/operator/combine/BaseCombineOperator.java | 8 +++ .../apache/pinot/core/util/trace/TraceContext.java | 23 +++++++ .../pinot/server/api/resources/QueryResource.java | 80 ++++++++++++++++++++++ 8 files changed, 163 insertions(+), 2 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 9bc1b466c0..376a847256 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -115,6 +115,22 @@ public class PinotClientRequest { } } + @GET + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Path("query/cancel") + @ApiOperation(value = "Querying pinot") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Query response"), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public void cancelSqlQuery(@ApiParam(value = "RequestId", required = true) @QueryParam("RequestId") long requestId, + @Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) { + //todo: should we get the query as well.. probably not needed if the broker maintains requestId <> RequestContext for the running queries + String response = _requestHandler.cancelRequest(requestId); + asyncResponse.resume(response); + + } @POST @ManagedAsync @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 3f2bd37322..9c91eb8572 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -125,6 +126,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final boolean _enableQueryLimitOverride; private final boolean _enableDistinctCountBitmapOverride; + public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) { @@ -189,7 +191,23 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (sql == null) { throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } - return handleRequest(requestId, sql.asText(), request, requesterIdentity, requestContext); + BrokerResponseNative brokerResponseNative = + handleRequest(requestId, sql.asText(), request, requesterIdentity, requestContext); + return brokerResponseNative; + } + + @Override + public String cancelRequest(long requestId) { + try { + + //we will send the cancel request to all servers.. we can probably optimize by asking the user to provide the query again or the list of tables + //Assuming this wont be frequently called.. invoking this on all servers it not a big overhead + + return ""; + + } catch (Exception e) { + return "Exception while trying to cancel request: " + requestId + ". " + e.getMessage(); + } } private BrokerResponseNative handleRequest(long requestId, String query, JsonNode request, @@ -1472,6 +1490,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (enableTrace) { queryOptions.put(Broker.Request.TRACE, "true"); } + + queryOptions.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId)); + // NOTE: Always set query options because we will put 'timeoutMs' later pinotQuery.setQueryOptions(queryOptions); if (!queryOptions.isEmpty()) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 93591dee71..75e823ca6c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -36,4 +36,6 @@ public interface BrokerRequestHandler { BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception; + + String cancelRequest(long requestId); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index 046e60c04f..2fcd849611 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -40,7 +40,6 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { private final boolean _isMultiStageQueryEngineEnabled; - public BrokerRequestHandlerDelegate( BrokerRequestHandler singleStageBrokerRequestHandler, @Nullable MultiStageBrokerRequestHandler multiStageWorkerRequestHandler @@ -85,4 +84,9 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { } return _singleStageBrokerRequestHandler.handleRequest(request, requesterIdentity, requestContext); } + + @Override + public String cancelRequest(long requestId) { + return _singleStageBrokerRequestHandler.cancelRequest(requestId); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java index d4c8772150..533613df9c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java @@ -36,8 +36,15 @@ public abstract class BaseOperator<T extends Block> implements Operator<T> { throw new EarlyTerminationException(); } try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) { + try { + Thread.sleep(100000); + } catch (InterruptedException e) { + e.printStackTrace(); + } return getNextBlock(); } + + } // Make it protected because we should always call nextBlock() diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 44fdd7cdbf..832877a26a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -35,6 +35,7 @@ import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.ThreadTimer; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.trace.TraceContext; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.Tracing; @@ -73,6 +74,13 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul // The parallelism is bounded by the task count. _numTasks = CombineOperatorUtils.getNumTasksForQuery(operators.size(), queryContext.getMaxExecutionThreads()); _futures = new Future[_numTasks]; + + try { + long requestId = Long.parseLong(queryContext.getQueryOptions().get("requestId")); + TraceContext.register(requestId, queryContext, _futures); + } catch (Exception e) { + //ignore.. this will only mean that we cannot cancel the query.. this can happen if requestId is null or not a long etc + } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java index 4d430434e5..da414fd1fc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java @@ -22,13 +22,17 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.spi.utils.JsonUtils; @@ -45,9 +49,28 @@ import org.apache.pinot.spi.utils.JsonUtils; * request from tracing to prevent resource leak. */ public final class TraceContext { + + static Map<Long, Pair<QueryContext, Future[]>> requestId2FuturesMap = new ConcurrentHashMap<>(); + private TraceContext() { } + public static void register(long requestId, QueryContext queryContext, Future[] futures) { + requestId2FuturesMap.put(requestId, Pair.of(queryContext, futures)); + } + + public static void cancel(long requestId) { + Pair<QueryContext, Future[]> queryContextFuturesPair = requestId2FuturesMap.get(requestId); + if (queryContextFuturesPair.getRight() != null) { + // Cancel all ongoing jobs + for (Future future : queryContextFuturesPair.getRight()) { + if (!future.isDone()) { + future.cancel(true); + } + } + } + } + /** * Trace represents the logs for a single thread. */ diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java new file mode 100644 index 0000000000..cd9011d0ac --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo; +import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.core.util.trace.TraceContext; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +/** + * Debug resource for Pinot Server. + */ +@Api(tags = "Query", authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name = + HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY))) +@Path("/query/") +public class QueryResource { + + @Inject + private ServerInstance _serverInstance; + + @GET + @Path("query/cancel") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a request", + notes = "This will cancel all the threads for a specific request") + public String getSegmentsDebugInfo( + @ApiParam(value = "RequestId to cancel", required = true) @PathParam("requestId") long requestId) { + TraceContext.cancel(requestId); + return "successfully deleted requestId"; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org