Jackie-Jiang commented on code in PR #9242:
URL: https://github.com/apache/pinot/pull/9242#discussion_r951885658


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java:
##########
@@ -118,6 +119,8 @@ public void runJob() {
     IntermediateResultsBlock mergedBlock;
     try {
       mergedBlock = mergeResults();
+    } catch (InterruptedException ie) {
+      throw new QueryCancelledException("Cancelled while merging results 
blocks", ie);

Review Comment:
   (minor) Consider not including the `InterruptedException` for cleaner 
response



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java:
##########
@@ -76,8 +77,10 @@ protected IntermediateResultsBlock getNextBlock() {
     assert orderByExpressions != null;
     if (orderByExpressions.get(0).getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER) {
       try {
-        return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, 
_queryContext,
-            _executorService).getNextBlock();
+        return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, 
_queryContext, _executorService)
+            .getNextBlock();
+      } catch (QueryCancelledException e) {
+        throw new QueryCancelledException("Cancelled while running 
MinMaxValueBasedSelectionOrderByCombineOperator", e);

Review Comment:
   (minor) We can simply throw it out because the stack trace should have the 
original class info
   ```suggestion
           throw e;
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java:
##########
@@ -154,6 +155,8 @@ public List<Operator> callJob() {
         Throwable cause = e.getCause();
         if (cause instanceof BadQueryRequestException) {
           throw (BadQueryRequestException) cause;
+        } else if (e instanceof InterruptedException) {
+          throw new QueryCancelledException("Cancelled while running 
CombinePlanNode", e);

Review Comment:
   (minor) Consider not including the `InterruptedException`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -102,11 +94,6 @@ public QueryScheduler(PinotConfiguration config, 
QueryExecutor queryExecutor, Re
     _numDroppedLogRateLimiter = RateLimiter.create(1.0d);
     _numDroppedLogCounter = new AtomicInteger(0);
     LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
-
-    _enableQueryCancellation = 
Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY));

Review Comment:
   (minor) Remove this constant



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java:
##########
@@ -172,22 +231,62 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) {
         new Exception(message, cause));
   }
 
+  /**
+   * Cancel a query as identified by the queryId. This method is non-blocking 
and the query may still run for a while
+   * after calling this method. This method can be called multiple times.
+   *
+   * @param queryId a unique Id to find the query
+   * @return true if a running query exists for the given queryId.
+   */
+  public boolean cancelQuery(String queryId) {
+    Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation 
is not enabled on server");

Review Comment:
   (minor) This should be `checkState`, same for other checks



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java:
##########
@@ -154,10 +200,23 @@ public void onSuccess(@Nullable byte[] responseBytes) {
 
       @Override
       public void onFailure(Throwable t) {
+        if (_enableQueryCancellation) {
+          String queryId = queryRequest.getQueryId();
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Remove track of running query: {} on failure", 
queryId);
+          }
+          _queryFuturesById.remove(queryId);
+        }
         // Send exception response.
-        LOGGER.error("Exception while processing instance request", t);
+        Exception ex = new Exception(t);
+        if (t instanceof CancellationException) {
+          LOGGER.info("Query: {} got cancelled", queryRequest.getQueryId());
+          ex = (Exception) t;
+        } else {
+          LOGGER.error("Exception while processing instance request", t);
+        }

Review Comment:
   Not introduced in this PR, but suggest avoid wrapping exceptions
   ```suggestion
           Exception e;
           if (t instanceof Exception) {
             e = (Exception) t;
             if (e instanceof CancellationException) {
               LOGGER.info("Query: {} got cancelled", 
queryRequest.getQueryId());
             } else {
               LOGGER.error("Exception while processing instance request", e);
             }
           } else {
             LOGGER.error("Error while processing instance request", t);
             e = new Exception(t);
           }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java:
##########
@@ -54,19 +65,24 @@
  * The {@code InstanceRequestHandler} is the Netty inbound handler on Pinot 
Server side to handle the serialized
  * instance requests sent from Pinot Broker.
  */
+@ChannelHandler.Sharable
 public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceRequestHandler.class);
 
   // TODO: make it configurable
   private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
 
+  private final String _instanceName;
   private final TDeserializer _deserializer;
   private final QueryScheduler _queryScheduler;
   private final ServerMetrics _serverMetrics;
   private final AccessControl _accessControl;
+  private final boolean _enableQueryCancellation;
+  private final Map<String, Future<byte[]>> _queryFuturesById = new 
ConcurrentHashMap<>();

Review Comment:
   When query cancellation is not enabled, let's set it to `null` to avoid the 
unused garbage.
   (Optional) We may also use whether it is `null` to identify whether query 
cancellation is enabled, this should have slightly better performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to