Jackie-Jiang commented on code in PR #9242: URL: https://github.com/apache/pinot/pull/9242#discussion_r951885658
########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java: ########## @@ -118,6 +119,8 @@ public void runJob() { IntermediateResultsBlock mergedBlock; try { mergedBlock = mergeResults(); + } catch (InterruptedException ie) { + throw new QueryCancelledException("Cancelled while merging results blocks", ie); Review Comment: (minor) Consider not including the `InterruptedException` for cleaner response ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java: ########## @@ -76,8 +77,10 @@ protected IntermediateResultsBlock getNextBlock() { assert orderByExpressions != null; if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) { try { - return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, - _executorService).getNextBlock(); + return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService) + .getNextBlock(); + } catch (QueryCancelledException e) { + throw new QueryCancelledException("Cancelled while running MinMaxValueBasedSelectionOrderByCombineOperator", e); Review Comment: (minor) We can simply throw it out because the stack trace should have the original class info ```suggestion throw e; ``` ########## pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java: ########## @@ -154,6 +155,8 @@ public List<Operator> callJob() { Throwable cause = e.getCause(); if (cause instanceof BadQueryRequestException) { throw (BadQueryRequestException) cause; + } else if (e instanceof InterruptedException) { + throw new QueryCancelledException("Cancelled while running CombinePlanNode", e); Review Comment: (minor) Consider not including the `InterruptedException` ########## pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java: ########## @@ -102,11 +94,6 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re _numDroppedLogRateLimiter = RateLimiter.create(1.0d); _numDroppedLogCounter = new AtomicInteger(0); LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate()); - - _enableQueryCancellation = Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY)); Review Comment: (minor) Remove this constant ########## pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java: ########## @@ -172,22 +231,62 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { new Exception(message, cause)); } + /** + * Cancel a query as identified by the queryId. This method is non-blocking and the query may still run for a while + * after calling this method. This method can be called multiple times. + * + * @param queryId a unique Id to find the query + * @return true if a running query exists for the given queryId. + */ + public boolean cancelQuery(String queryId) { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server"); Review Comment: (minor) This should be `checkState`, same for other checks ########## pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java: ########## @@ -154,10 +200,23 @@ public void onSuccess(@Nullable byte[] responseBytes) { @Override public void onFailure(Throwable t) { + if (_enableQueryCancellation) { + String queryId = queryRequest.getQueryId(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Remove track of running query: {} on failure", queryId); + } + _queryFuturesById.remove(queryId); + } // Send exception response. - LOGGER.error("Exception while processing instance request", t); + Exception ex = new Exception(t); + if (t instanceof CancellationException) { + LOGGER.info("Query: {} got cancelled", queryRequest.getQueryId()); + ex = (Exception) t; + } else { + LOGGER.error("Exception while processing instance request", t); + } Review Comment: Not introduced in this PR, but suggest avoid wrapping exceptions ```suggestion Exception e; if (t instanceof Exception) { e = (Exception) t; if (e instanceof CancellationException) { LOGGER.info("Query: {} got cancelled", queryRequest.getQueryId()); } else { LOGGER.error("Exception while processing instance request", e); } } else { LOGGER.error("Error while processing instance request", t); e = new Exception(t); } ``` ########## pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java: ########## @@ -54,19 +65,24 @@ * The {@code InstanceRequestHandler} is the Netty inbound handler on Pinot Server side to handle the serialized * instance requests sent from Pinot Broker. */ +@ChannelHandler.Sharable public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class); // TODO: make it configurable private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100; + private final String _instanceName; private final TDeserializer _deserializer; private final QueryScheduler _queryScheduler; private final ServerMetrics _serverMetrics; private final AccessControl _accessControl; + private final boolean _enableQueryCancellation; + private final Map<String, Future<byte[]>> _queryFuturesById = new ConcurrentHashMap<>(); Review Comment: When query cancellation is not enabled, let's set it to `null` to avoid the unused garbage. (Optional) We may also use whether it is `null` to identify whether query cancellation is enabled, this should have slightly better performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org