This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 75b7b6fcbf Improvements to OOM protection for Multi-Stage Engine (#13955) 75b7b6fcbf is described below commit 75b7b6fcbfd0ba92f62a2b3469f18e1550013024 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Fri Sep 20 18:17:22 2024 +0530 Improvements to OOM protection for Multi-Stage Engine (#13955) * Add pinot.query.scheduler.accounting.enable.thread.sampling.mse.debug config option * Fixed some cases where query id were not set in workers --- .../CPUMemThreadLevelAccountingObjects.java | 2 +- .../PerQueryCPUMemAccountantFactory.java | 33 +++- .../LeafStageTransferableBlockOperator.java | 11 ++ .../query/runtime/operator/MultiStageOperator.java | 2 +- .../apache/pinot/query/QueryServerEnclosure.java | 5 +- .../runtime/queries/QueryRunnerAccountingTest.java | 183 +++++++++++++++++++++ .../query/runtime/queries/QueryRunnerTestBase.java | 11 +- .../accounting/ThreadResourceUsageAccountant.java | 5 + .../java/org/apache/pinot/spi/trace/Tracing.java | 32 +++- .../apache/pinot/spi/utils/CommonConstants.java | 4 + 10 files changed, 269 insertions(+), 19 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java index 378fcd991f..0c2e0daee4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java @@ -109,7 +109,7 @@ public class CPUMemThreadLevelAccountingObjects { return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : taskEntry.getTaskType(); } - public void setThreadTaskStatus(@Nonnull String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + public void setThreadTaskStatus(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType, @Nonnull Thread anchorThread) { _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index fef0c3beeb..f8a5a71ea7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -124,6 +124,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory // track memory usage private final boolean _isThreadMemorySamplingEnabled; + // is sampling allowed for MSE queries + private final boolean _isThreadSamplingEnabledForMSE; + private final Set<String> _inactiveQuery; // the periodical task that aggregates and preempts queries @@ -157,6 +160,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled, _isThreadMemorySamplingEnabled); + _isThreadSamplingEnabledForMSE = + config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, + CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE); + LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE); + // ThreadMXBean wrapper _threadResourceUsageProvider = new ThreadLocal<>(); @@ -231,6 +239,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory sampleThreadCPUTime(); } + /** + * Sample Usage for Multi-stage engine queries + */ + @Override + public void sampleUsageMSE() { + if (_isThreadSamplingEnabledForMSE) { + sampleThreadBytesAllocated(); + sampleThreadCPUTime(); + } + } + /** * for testing only */ @@ -286,17 +305,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } @Override - public void createExecutionContextInner(@Nullable String queryId, int taskId, @Nullable - ThreadExecutionContext parentContext) { + public void createExecutionContextInner(@Nullable String queryId, int taskId, + ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { _threadLocalEntry.get()._errorStatus.set(null); if (parentContext == null) { // is anchor thread assert queryId != null; - _threadLocalEntry.get().setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, - ThreadExecutionContext.TaskType.UNKNOWN, Thread.currentThread()); + _threadLocalEntry.get() + .setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread()); } else { // not anchor thread - _threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getTaskType(), + _threadLocalEntry.get().setThreadTaskStatus(queryId, taskId, parentContext.getTaskType(), parentContext.getAnchorThread()); } } @@ -306,6 +325,10 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory return _threadLocalEntry.get().getCurrentThreadTaskStatus(); } + public CPUMemThreadLevelAccountingObjects.ThreadEntry getThreadEntry() { + return _threadLocalEntry.get(); + } + /** * clears thread accounting info once a runner/worker thread has finished a particular run */ diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 7a729460b1..5a433ba370 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -57,6 +57,9 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.utils.TypeUtils; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -270,10 +273,14 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { private Future<Void> startExecution() { ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer(); ServerQueryLogger queryLogger = ServerQueryLogger.getInstance(); + ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); return _executorService.submit(() -> { try { if (_requests.size() == 1) { ServerQueryRequest request = _requests.get(0); + ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + Tracing.ThreadAccountantOps.setupWorker(1, threadResourceUsageProvider, parentContext); + InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(request, _executorService, resultsBlockConsumer); if (queryLogger != null) { @@ -303,7 +310,11 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { CountDownLatch latch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { ServerQueryRequest request = _requests.get(i); + int taskId = i; futures[i] = _executorService.submit(() -> { + ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); + Tracing.ThreadAccountantOps.setupWorker(taskId, threadResourceUsageProvider, parentContext); + try { InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(request, _executorService, resultsBlockConsumer); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 2690a5a7f7..16d2af4610 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -69,7 +69,7 @@ public abstract class MultiStageOperator // Samples resource usage of the operator. The operator should call this function for every block of data or // assuming the block holds 10000 rows or more. protected void sampleAndCheckInterruption() { - Tracing.ThreadAccountantOps.sample(); + Tracing.ThreadAccountantOps.sampleMSE(); if (Tracing.ThreadAccountantOps.isInterrupted()) { earlyTerminate(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 1d2d81993c..437fee82fa 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -32,6 +32,7 @@ import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -115,9 +116,9 @@ public class QueryServerEnclosure { } public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, - Map<String, String> requestMetadataMap) { + Map<String, String> requestMetadataMap, ThreadExecutionContext parentContext) { return CompletableFuture.runAsync( - () -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadataMap, null), + () -> _queryRunner.processQuery(workerMetadata, stagePlan, requestMetadataMap, parentContext), _queryRunner.getExecutorService()); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java new file mode 100644 index 0000000000..3aa6556b1b --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.queries; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; +import org.apache.pinot.query.QueryEnvironmentTestBase; +import org.apache.pinot.query.QueryServerEnclosure; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; +import org.apache.pinot.query.testutils.QueryTestUtils; +import org.apache.pinot.spi.accounting.QueryResourceTracker; +import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; +import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.EarlyTerminationException; +import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class QueryRunnerAccountingTest extends QueryRunnerTestBase { + + @BeforeClass + public void setUp() + throws Exception { + MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1"); + factory1.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME"); + factory1.addSegment("a_REALTIME", QueryRunnerTest.buildRows("a_REALTIME")); + + MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2"); + factory2.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME"); + factory2.addSegment("b_REALTIME", QueryRunnerTest.buildRows("b_REALTIME")); + + _reducerHostname = "localhost"; + _reducerPort = QueryTestUtils.getAvailablePort(); + Map<String, Object> reducerConfig = new HashMap<>(); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, _reducerHostname); + reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _reducerPort); + _mailboxService = new MailboxService(_reducerHostname, _reducerPort, new PinotConfiguration(reducerConfig)); + _mailboxService.start(); + + QueryServerEnclosure server1 = new QueryServerEnclosure(factory1); + server1.start(); + // Start server1 to ensure the next server will have a different port. + QueryServerEnclosure server2 = new QueryServerEnclosure(factory2); + server2.start(); + // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. + // this is only use for test identifier purpose. + int port1 = server1.getPort(); + int port2 = server2.getPort(); + _servers.put(new QueryServerInstance("localhost", port1, port1), server1); + _servers.put(new QueryServerInstance("localhost", port2, port2), server2); + + _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerPort, server1.getPort(), server2.getPort(), + factory1.getRegisteredSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(), + null); + } + + @AfterClass + public void tearDown() { + for (QueryServerEnclosure server : _servers.values()) { + server.shutDown(); + } + _mailboxService.shutdown(); + } + + @Test + void testWithDefaultThreadAccountant() { + Tracing.DefaultThreadResourceUsageAccountant accountant = new Tracing.DefaultThreadResourceUsageAccountant(); + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); + + ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + Assert.assertEquals(resultTable.getRows().size(), 2); + + ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant(); + Assert.assertTrue(threadAccountant.getThreadResources().isEmpty()); + Assert.assertTrue(threadAccountant.getQueryResources().isEmpty()); + } + } + + @Test + void testWithPerQueryAccountantFactory() { + HashMap<String, Object> configs = getAccountingConfig(); + + ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), + "testWithPerQueryAccountantFactory"); + + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); + + ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + Assert.assertEquals(resultTable.getRows().size(), 2); + + Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources(); + Assert.assertEquals(resources.size(), 1); + Assert.assertTrue(resources.entrySet().iterator().next().getValue().getAllocatedBytes() > 0); + } + } + + @Test + void testDisableSamplingForMSE() { + HashMap<String, Object> configs = getAccountingConfig(); + configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false); + + ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), + "testWithPerQueryAccountantFactory"); + + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); + ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + Assert.assertEquals(resultTable.getRows().size(), 2); + + Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources(); + Assert.assertEquals(resources.size(), 1); + Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(), 0); + } + } + + public static class InterruptingAccountant + extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { + + public InterruptingAccountant(PinotConfiguration config, String instanceId) { + super(config, instanceId); + } + + @Override + public boolean isAnchorThreadInterrupted() { + return true; + } + } + + @Test(expectedExceptions = EarlyTerminationException.class) + void testInterrupt() { + HashMap<String, Object> configs = getAccountingConfig(); + + ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); + InterruptingAccountant accountant = + new InterruptingAccountant(new PinotConfiguration(configs), "testWithPerQueryAccountantFactory"); + + try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { + tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); + queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); + } + } + + private static HashMap<String, Object> getAccountingConfig() { + HashMap<String, Object> configs = new HashMap<>(); + ServerMetrics.register(Mockito.mock(ServerMetrics.class)); + configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true); + return configs; + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index de65466725..e6d6d43424 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -61,9 +61,11 @@ import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.StagePlan; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.StringUtil; @@ -133,7 +135,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { List<CompletableFuture<?>> submissionStubs = new ArrayList<>(); for (int stageId = 0; stageId < stagePlans.size(); stageId++) { if (stageId != 0) { - submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap)); + submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, requestId, stageId, + requestMetadataMap)); } } try { @@ -155,20 +158,22 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { } protected List<CompletableFuture<?>> processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan, - int stageId, Map<String, String> requestMetadataMap) { + long requestId, int stageId, Map<String, String> requestMetadataMap) { DispatchablePlanFragment dispatchableStagePlan = dispatchableSubPlan.getQueryStageList().get(stageId); List<WorkerMetadata> stageWorkerMetadataList = dispatchableStagePlan.getWorkerMetadataList(); List<CompletableFuture<?>> submissionStubs = new ArrayList<>(); for (Map.Entry<QueryServerInstance, List<Integer>> entry : dispatchableStagePlan.getServerInstanceToWorkerIdMap() .entrySet()) { QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey()); + Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE); + ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); List<WorkerMetadata> workerMetadataList = entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList()); StageMetadata stageMetadata = new StageMetadata(stageId, workerMetadataList, dispatchableStagePlan.getCustomProperties()); StagePlan stagePlan = new StagePlan(dispatchableStagePlan.getPlanFragment().getFragmentRoot(), stageMetadata); for (WorkerMetadata workerMetadata : workerMetadataList) { - submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap)); + submissionStubs.add(serverEnclosure.processQuery(workerMetadata, stagePlan, requestMetadataMap, parentContext)); } } return submissionStubs; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index bab51cf21c..a3d1061dd6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -60,6 +60,11 @@ public interface ThreadResourceUsageAccountant { */ void sampleUsage(); + /** + * Sample Usage for Multi-stage engine queries + */ + void sampleUsageMSE(); + /** * special interface to aggregate usage to the stats store only once, it is used for response * ser/de threads where the thread execution context cannot be setup before hands as diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 59ad65eef5..1f43d5e0d7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadExecutionContext; @@ -195,18 +197,23 @@ public class Tracing { public void sampleUsage() { } + @Override + public void sampleUsageMSE() { + } + @Override public void updateQueryUsageConcurrently(String queryId) { } @Override - public final void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, - ThreadExecutionContext parentContext) { + public final void createExecutionContext(@Nullable String queryId, int taskId, + ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { _anchorThread.set(parentContext == null ? Thread.currentThread() : parentContext.getAnchorThread()); - createExecutionContextInner(queryId, taskId, parentContext); + createExecutionContextInner(queryId, taskId, taskType, parentContext); } - public void createExecutionContextInner(String queryId, int taskId, ThreadExecutionContext parentContext) { + public void createExecutionContextInner(@Nullable String queryId, int taskId, + ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { } @Override @@ -259,11 +266,11 @@ public class Tracing { private ThreadAccountantOps() { } - public static void setupRunner(String queryId) { + public static void setupRunner(@Nonnull String queryId) { setupRunner(queryId, ThreadExecutionContext.TaskType.SSE); } - public static void setupRunner(String queryId, ThreadExecutionContext.TaskType taskType) { + public static void setupRunner(@Nonnull String queryId, ThreadExecutionContext.TaskType taskType) { Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); Tracing.getThreadAccountant() .createExecutionContext(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, null); @@ -290,13 +297,24 @@ public class Tracing { ThreadResourceUsageProvider threadResourceUsageProvider, ThreadExecutionContext threadExecutionContext) { Tracing.getThreadAccountant().setThreadResourceUsageProvider(threadResourceUsageProvider); - Tracing.getThreadAccountant().createExecutionContext(null, taskId, taskType, threadExecutionContext); + String queryId = null; + if (threadExecutionContext != null) { + queryId = threadExecutionContext.getQueryId(); + } else { + LOGGER.warn("Request ID not available. ParentContext not set for query worker thread."); + } + Tracing.getThreadAccountant() + .createExecutionContext(queryId, taskId, taskType, threadExecutionContext); } public static void sample() { Tracing.getThreadAccountant().sampleUsage(); } + public static void sampleMSE() { + Tracing.getThreadAccountant().sampleUsageMSE(); + } + public static void clear() { Tracing.getThreadAccountant().clear(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 58905ba71f..f3ea4ecba4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -979,6 +979,10 @@ public class CommonConstants { public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled"; public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false; + + public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = + "accounting.enable.thread.sampling.mse.debug"; + public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true; } public static class ExecutorService { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org