gortiz commented on code in PR #16142:
URL: https://github.com/apache/pinot/pull/16142#discussion_r2197712214


##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -106,6 +107,8 @@ public static class PerQueryCPUMemResourceUsageAccountant 
implements ThreadResou
     protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = 
new HashMap<>();
     protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = 
new HashMap<>();
 
+    protected final ConcurrentHashMap<String, QueryCancelCallback> 
_queryCancelCallbacks = new ConcurrentHashMap<>();

Review Comment:
   Are we 100% we are going to clean up this even in case of error? Otherwise 
it would be better to use a Guava Cache, which will be cleaned after a while by 
other threads when new values are added



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryCancelCallback.java:
##########
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+public interface QueryCancelCallback {
+  /**
+   * Cancels the query with the given queryId.
+   */
+  void cancelQuery();

Review Comment:
   I like the idea of using an interface here, but I think we need to specify 
whether this method is blocking (and therefore once the call finishes the query 
is guarantee to be killed) or could be async



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -814,14 +837,21 @@ void killAllQueries() {
 
         if (config.isOomKillQueryEnabled()) {
           int killedCount = 0;
+          Set<String> terminatedQueryIds = new HashSet<>();
           for (Map.Entry<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : 
_threadEntriesMap.entrySet()) {
             CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
entry.getValue();
             CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = 
threadEntry.getCurrentThreadTaskStatus();
-            if (taskEntry != null && taskEntry.isAnchorThread()) {
-              threadEntry._errorStatus
-                  .set(new RuntimeException(String.format("Query killed due to 
%s out of memory!", _instanceType)));
-              taskEntry.getAnchorThread().interrupt();
-              killedCount += 1;
+            if (taskEntry != null && 
!terminatedQueryIds.contains(taskEntry.getQueryId())) {
+              if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) {
+                
_queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery();
+                terminatedQueryIds.add(taskEntry.getQueryId());
+              } else if (taskEntry.isAnchorThread()) {

Review Comment:
   Do we still need this older option?



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -879,7 +909,11 @@ private void killMostExpensiveQuery() {
                 .set(new RuntimeException(String.format(
                     " Query %s got killed because using %d bytes of memory on 
%s: %s, exceeding the quota",
                     maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), 
_instanceType, _instanceId)));
-            interruptRunnerThread(maxUsageTuple.getAnchorThread());
+            if (_queryCancelCallbacks.containsKey(maxUsageTuple.getQueryId())) 
{
+              
_queryCancelCallbacks.get(maxUsageTuple.getQueryId()).cancelQuery();

Review Comment:
   Same race condition here



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -905,7 +939,11 @@ private void killCPUTimeExceedQueries() {
                 String.format("Query %s got killed on %s: %s because using %d "
                         + "CPU time exceeding limit of %d ns CPU time", 
value._queryId, _instanceType, _instanceId,
                     value.getCpuTimeNs(), 
config.getCpuTimeBasedKillingThresholdNS())));
-            interruptRunnerThread(value.getAnchorThread());
+            if (_queryCancelCallbacks.containsKey(value.getQueryId())) {
+              _queryCancelCallbacks.get(value.getQueryId()).cancelQuery();

Review Comment:
   And here



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -267,6 +269,11 @@ public void shutDown() {
   /// If any error happened during the asynchronous execution, an error block 
will be sent to all receiver mailboxes.
   public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan,
       Map<String, String> requestMetadata, @Nullable ThreadExecutionContext 
parentContext) {
+    String requestIdStr = Long.toString(QueryThreadContext.getRequestId());
+    long requestId = QueryThreadContext.getRequestId();
+
+    _resourceUsageAccountant.registerQueryCancelCallback(requestIdStr, () -> 
_opChainScheduler.cancel(requestId));

Review Comment:
   Not super important, but we need to allocate a new instance per query here.
    
   Instead we could change the signature of the QueryCancelCallback to receive 
the request id as argument and then we could create a single instance callback 
and reuse it.



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryAccountantCancelTest.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.queries;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertNotNull;
+
+
+public class QueryAccountantCancelTest extends QueryRunnerTestBase {
+  private 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
_accountant;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    MockInstanceDataManagerFactory factory1 = new 
MockInstanceDataManagerFactory("server1");
+    
factory1.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("a").build(),
 "a_REALTIME");
+    factory1.addSegment("a_REALTIME", QueryRunnerTest.buildRows("a_REALTIME"));
+
+    MockInstanceDataManagerFactory factory2 = new 
MockInstanceDataManagerFactory("server2");
+    
factory2.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("b").build(),
 "b_REALTIME");
+    factory2.addSegment("b_REALTIME", QueryRunnerTest.buildRows("b_REALTIME"));
+
+    _reducerHostname = "localhost";
+    _reducerPort = QueryTestUtils.getAvailablePort();
+    Map<String, Object> reducerConfig = new HashMap<>();
+    
reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
 _reducerHostname);
+    
reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 _reducerPort);
+    _mailboxService = new MailboxService(_reducerHostname, _reducerPort, new 
PinotConfiguration(reducerConfig));
+    _mailboxService.start();
+
+    HashMap<String, Object> configs = getAccountingConfig();
+
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    _accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(configs),
+            "testWithPerQueryAccountantFactory", InstanceType.SERVER);
+
+    QueryServerEnclosure server1 = new QueryServerEnclosure(factory1, 
Map.of(), _accountant);
+    server1.start();
+    // Start server1 to ensure the next server will have a different port.
+    QueryServerEnclosure server2 = new QueryServerEnclosure(factory2, 
Map.of(), _accountant);
+    server2.start();
+    // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
+    // this is only use for test identifier purpose.
+    int port1 = server1.getPort();
+    int port2 = server2.getPort();
+    _servers.put(new QueryServerInstance("Server_localhost_" + port1, 
"localhost", port1, port1), server1);
+    _servers.put(new QueryServerInstance("Server_localhost_" + port2, 
"localhost", port2, port2), server2);
+
+    _queryEnvironment = 
QueryEnvironmentTestBase.getQueryEnvironment(_reducerPort, server1.getPort(), 
server2.getPort(),
+        factory1.getRegisteredSchemaMap(), 
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+        null);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServerEnclosure server : _servers.values()) {
+      server.shutDown();
+    }
+    _mailboxService.shutdown();
+  }
+
+  @Test
+  void testWithPerQueryAccountantFactory() {
+    try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
+      tracing.when(Tracing::getThreadAccountant).thenReturn(_accountant);
+
+      ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", 
false).getResultTable();
+      Assert.assertEquals(resultTable.getRows().size(), 2);
+      // TODO: How to programmatically get the request id ?
+      assertNotNull(_accountant.getQueryCancelCallback("0"));

Review Comment:
   We are using corretlations id here, right? in that case you can set the CID 
by using query options. See 
https://docs.pinot.apache.org/users/user-guide-query/query-correlation-id



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -491,6 +504,16 @@ public Map<String, AggregatedStats> aggregate(boolean 
isTriggered) {
     public void postAggregation(Map<String, AggregatedStats> 
aggregatedUsagePerActiveQuery) {
     }
 
+    public boolean cancelQuery(String queryId) {
+      if (_queryCancelCallbacks.containsKey(queryId)) {
+        QueryCancelCallback callback = _queryCancelCallbacks.get(queryId);
+        callback.cancelQuery();
+        return true;

Review Comment:
   Could we remove the entry here? or we expect to try to cancel them again in 
future?



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -814,14 +837,21 @@ void killAllQueries() {
 
         if (config.isOomKillQueryEnabled()) {
           int killedCount = 0;
+          Set<String> terminatedQueryIds = new HashSet<>();
           for (Map.Entry<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : 
_threadEntriesMap.entrySet()) {
             CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
entry.getValue();
             CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = 
threadEntry.getCurrentThreadTaskStatus();
-            if (taskEntry != null && taskEntry.isAnchorThread()) {
-              threadEntry._errorStatus
-                  .set(new RuntimeException(String.format("Query killed due to 
%s out of memory!", _instanceType)));
-              taskEntry.getAnchorThread().interrupt();
-              killedCount += 1;
+            if (taskEntry != null && 
!terminatedQueryIds.contains(taskEntry.getQueryId())) {
+              if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) {
+                
_queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery();

Review Comment:
   I think there is a race condition here. It seems strange copilot doesn't 
detect it.
   
   The entry could be removed after this if but before we can get the value.
   
   The solution is simple, you can always execute `get` and only call cancel if 
the returned value is not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to