gortiz commented on code in PR #16142: URL: https://github.com/apache/pinot/pull/16142#discussion_r2197712214
########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -106,6 +107,8 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>(); protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>(); + protected final ConcurrentHashMap<String, QueryCancelCallback> _queryCancelCallbacks = new ConcurrentHashMap<>(); Review Comment: Are we 100% we are going to clean up this even in case of error? Otherwise it would be better to use a Guava Cache, which will be cleaned after a while by other threads when new values are added ########## pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryCancelCallback.java: ########## @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.accounting; + +public interface QueryCancelCallback { + /** + * Cancels the query with the given queryId. + */ + void cancelQuery(); Review Comment: I like the idea of using an interface here, but I think we need to specify whether this method is blocking (and therefore once the call finishes the query is guarantee to be killed) or could be async ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -814,14 +837,21 @@ void killAllQueries() { if (config.isOomKillQueryEnabled()) { int killedCount = 0; + Set<String> terminatedQueryIds = new HashSet<>(); for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = threadEntry.getCurrentThreadTaskStatus(); - if (taskEntry != null && taskEntry.isAnchorThread()) { - threadEntry._errorStatus - .set(new RuntimeException(String.format("Query killed due to %s out of memory!", _instanceType))); - taskEntry.getAnchorThread().interrupt(); - killedCount += 1; + if (taskEntry != null && !terminatedQueryIds.contains(taskEntry.getQueryId())) { + if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) { + _queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery(); + terminatedQueryIds.add(taskEntry.getQueryId()); + } else if (taskEntry.isAnchorThread()) { Review Comment: Do we still need this older option? ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -879,7 +909,11 @@ private void killMostExpensiveQuery() { .set(new RuntimeException(String.format( " Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota", maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); - interruptRunnerThread(maxUsageTuple.getAnchorThread()); + if (_queryCancelCallbacks.containsKey(maxUsageTuple.getQueryId())) { + _queryCancelCallbacks.get(maxUsageTuple.getQueryId()).cancelQuery(); Review Comment: Same race condition here ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -905,7 +939,11 @@ private void killCPUTimeExceedQueries() { String.format("Query %s got killed on %s: %s because using %d " + "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId, value.getCpuTimeNs(), config.getCpuTimeBasedKillingThresholdNS()))); - interruptRunnerThread(value.getAnchorThread()); + if (_queryCancelCallbacks.containsKey(value.getQueryId())) { + _queryCancelCallbacks.get(value.getQueryId()).cancelQuery(); Review Comment: And here ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -267,6 +269,11 @@ public void shutDown() { /// If any error happened during the asynchronous execution, an error block will be sent to all receiver mailboxes. public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> requestMetadata, @Nullable ThreadExecutionContext parentContext) { + String requestIdStr = Long.toString(QueryThreadContext.getRequestId()); + long requestId = QueryThreadContext.getRequestId(); + + _resourceUsageAccountant.registerQueryCancelCallback(requestIdStr, () -> _opChainScheduler.cancel(requestId)); Review Comment: Not super important, but we need to allocate a new instance per query here. Instead we could change the signature of the QueryCancelCallback to receive the request id as argument and then we could create a single instance callback and reuse it. ########## pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryAccountantCancelTest.java: ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.queries; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; +import org.apache.pinot.query.QueryEnvironmentTestBase; +import org.apache.pinot.query.QueryServerEnclosure; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; +import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertNotNull; + + +public class QueryAccountantCancelTest extends QueryRunnerTestBase { + private PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant _accountant; + + @BeforeClass + public void setUp() + throws Exception { + MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1"); + factory1.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME"); + factory1.addSegment("a_REALTIME", QueryRunnerTest.buildRows("a_REALTIME")); + + MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2"); + factory2.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME"); + factory2.addSegment("b_REALTIME", QueryRunnerTest.buildRows("b_REALTIME")); + + _reducerHostname = "localhost"; + _reducerPort = QueryTestUtils.getAvailablePort(); + Map<String, Object> reducerConfig = new HashMap<>(); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _reducerPort); + _mailboxService = new MailboxService(_reducerHostname, _reducerPort, new PinotConfiguration(reducerConfig)); + _mailboxService.start(); + + HashMap<String, Object> configs = getAccountingConfig(); + + ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); + _accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), + "testWithPerQueryAccountantFactory", InstanceType.SERVER); + + QueryServerEnclosure server1 = new QueryServerEnclosure(factory1, Map.of(), _accountant); + server1.start(); + // Start server1 to ensure the next server will have a different port. + QueryServerEnclosure server2 = new QueryServerEnclosure(factory2, Map.of(), _accountant); + server2.start(); + // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. + // this is only use for test identifier purpose. + int port1 = server1.getPort(); + int port2 = server2.getPort(); + _servers.put(new QueryServerInstance("Server_localhost_" + port1, "localhost", port1, port1), server1); + _servers.put(new QueryServerInstance("Server_localhost_" + port2, "localhost", port2, port2), server2); + + _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerPort, server1.getPort(), server2.getPort(), + factory1.getRegisteredSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(), + null); + } + + @AfterClass + public void tearDown() { + for (QueryServerEnclosure server : _servers.values()) { + server.shutDown(); + } + _mailboxService.shutdown(); + } + + @Test + void testWithPerQueryAccountantFactory() { + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(_accountant); + + ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + Assert.assertEquals(resultTable.getRows().size(), 2); + // TODO: How to programmatically get the request id ? + assertNotNull(_accountant.getQueryCancelCallback("0")); Review Comment: We are using corretlations id here, right? in that case you can set the CID by using query options. See https://docs.pinot.apache.org/users/user-guide-query/query-correlation-id ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -491,6 +504,16 @@ public Map<String, AggregatedStats> aggregate(boolean isTriggered) { public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) { } + public boolean cancelQuery(String queryId) { + if (_queryCancelCallbacks.containsKey(queryId)) { + QueryCancelCallback callback = _queryCancelCallbacks.get(queryId); + callback.cancelQuery(); + return true; Review Comment: Could we remove the entry here? or we expect to try to cancel them again in future? ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -814,14 +837,21 @@ void killAllQueries() { if (config.isOomKillQueryEnabled()) { int killedCount = 0; + Set<String> terminatedQueryIds = new HashSet<>(); for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = threadEntry.getCurrentThreadTaskStatus(); - if (taskEntry != null && taskEntry.isAnchorThread()) { - threadEntry._errorStatus - .set(new RuntimeException(String.format("Query killed due to %s out of memory!", _instanceType))); - taskEntry.getAnchorThread().interrupt(); - killedCount += 1; + if (taskEntry != null && !terminatedQueryIds.contains(taskEntry.getQueryId())) { + if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) { + _queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery(); Review Comment: I think there is a race condition here. It seems strange copilot doesn't detect it. The entry could be removed after this if but before we can get the value. The solution is simple, you can always execute `get` and only call cancel if the returned value is not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org