vrajat commented on code in PR #16142: URL: https://github.com/apache/pinot/pull/16142#discussion_r2199603364
########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -814,14 +837,21 @@ void killAllQueries() { if (config.isOomKillQueryEnabled()) { int killedCount = 0; + Set<String> terminatedQueryIds = new HashSet<>(); for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = threadEntry.getCurrentThreadTaskStatus(); - if (taskEntry != null && taskEntry.isAnchorThread()) { - threadEntry._errorStatus - .set(new RuntimeException(String.format("Query killed due to %s out of memory!", _instanceType))); - taskEntry.getAnchorThread().interrupt(); - killedCount += 1; + if (taskEntry != null && !terminatedQueryIds.contains(taskEntry.getQueryId())) { + if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) { + _queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery(); Review Comment: This part of the code - `WatcherTask` is single-threaded. * There are multiple writers to the hash table. Although every writer will create a new entry. * Get and delete will be by only the `WatcherTask` thread. So each key is write once. Read/Delete once. ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -106,6 +107,8 @@ public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResou protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>(); protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>(); + protected final ConcurrentHashMap<String, QueryCancelCallback> _queryCancelCallbacks = new ConcurrentHashMap<>(); Review Comment: I'll check. At the minimum, I'll use a data structure that is bounded in size. ########## pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryAccountantCancelTest.java: ########## @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.queries; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; +import org.apache.pinot.query.QueryEnvironmentTestBase; +import org.apache.pinot.query.QueryServerEnclosure; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; +import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertNotNull; + + +public class QueryAccountantCancelTest extends QueryRunnerTestBase { + private PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant _accountant; + + @BeforeClass + public void setUp() + throws Exception { + MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1"); + factory1.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME"); + factory1.addSegment("a_REALTIME", QueryRunnerTest.buildRows("a_REALTIME")); + + MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2"); + factory2.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME"); + factory2.addSegment("b_REALTIME", QueryRunnerTest.buildRows("b_REALTIME")); + + _reducerHostname = "localhost"; + _reducerPort = QueryTestUtils.getAvailablePort(); + Map<String, Object> reducerConfig = new HashMap<>(); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _reducerPort); + _mailboxService = new MailboxService(_reducerHostname, _reducerPort, new PinotConfiguration(reducerConfig)); + _mailboxService.start(); + + HashMap<String, Object> configs = getAccountingConfig(); + + ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); + _accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), + "testWithPerQueryAccountantFactory", InstanceType.SERVER); + + QueryServerEnclosure server1 = new QueryServerEnclosure(factory1, Map.of(), _accountant); + server1.start(); + // Start server1 to ensure the next server will have a different port. + QueryServerEnclosure server2 = new QueryServerEnclosure(factory2, Map.of(), _accountant); + server2.start(); + // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. + // this is only use for test identifier purpose. + int port1 = server1.getPort(); + int port2 = server2.getPort(); + _servers.put(new QueryServerInstance("Server_localhost_" + port1, "localhost", port1, port1), server1); + _servers.put(new QueryServerInstance("Server_localhost_" + port2, "localhost", port2, port2), server2); + + _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerPort, server1.getPort(), server2.getPort(), + factory1.getRegisteredSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(), + null); + } + + @AfterClass + public void tearDown() { + for (QueryServerEnclosure server : _servers.values()) { + server.shutDown(); + } + _mailboxService.shutdown(); + } + + @Test + void testWithPerQueryAccountantFactory() { + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(_accountant); + + ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + Assert.assertEquals(resultTable.getRows().size(), 2); + // TODO: How to programmatically get the request id ? + assertNotNull(_accountant.getQueryCancelCallback("0")); Review Comment: Not yet. The feature works with correlation id. However the test dont work with correlation id - sigh ########## pinot-spi/src/main/java/org/apache/pinot/spi/accounting/QueryCancelCallback.java: ########## @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.accounting; + +public interface QueryCancelCallback { + /** + * Cancels the query with the given queryId. + */ + void cancelQuery(); Review Comment: It will be async. I'll add javadocs. ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -491,6 +504,16 @@ public Map<String, AggregatedStats> aggregate(boolean isTriggered) { public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) { } + public boolean cancelQuery(String queryId) { + if (_queryCancelCallbacks.containsKey(queryId)) { + QueryCancelCallback callback = _queryCancelCallbacks.get(queryId); + callback.cancelQuery(); + return true; Review Comment: I can remove the entry here. However it still has to be removed in the `reapFinishedTasks` for cases where cancel is not called. ########## pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java: ########## @@ -814,14 +837,21 @@ void killAllQueries() { if (config.isOomKillQueryEnabled()) { int killedCount = 0; + Set<String> terminatedQueryIds = new HashSet<>(); for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = threadEntry.getCurrentThreadTaskStatus(); - if (taskEntry != null && taskEntry.isAnchorThread()) { - threadEntry._errorStatus - .set(new RuntimeException(String.format("Query killed due to %s out of memory!", _instanceType))); - taskEntry.getAnchorThread().interrupt(); - killedCount += 1; + if (taskEntry != null && !terminatedQueryIds.contains(taskEntry.getQueryId())) { + if (_queryCancelCallbacks.containsKey(taskEntry.getQueryId())) { + _queryCancelCallbacks.get(taskEntry.getQueryId()).cancelQuery(); + terminatedQueryIds.add(taskEntry.getQueryId()); + } else if (taskEntry.isAnchorThread()) { Review Comment: I'll keep it for now. * I removed it and reverted the change because the diff got too big. * Previously I had added checks to make sure anchor thread/parent context exists. Fixed NPE etc. I'll undo these changes esp. for MSE in a separate thread with unit tests. It will be safer that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org