This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c8eb53a4d3 Make OOM Protection work with GRPC Queries (#16004) c8eb53a4d3 is described below commit c8eb53a4d3823700956badc29eac4c6ed81e0b7d Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Wed Jun 11 16:53:45 2025 +0530 Make OOM Protection work with GRPC Queries (#16004) --- .../CPUMemThreadLevelAccountingObjects.java | 5 +- .../PerQueryCPUMemAccountantFactory.java | 41 ++++++++++---- .../tests/OfflineGRPCServerIntegrationTest.java | 2 +- ...lineGRPCServerOOMAccountingIntegrationTest.java | 46 +++++++++++++++ .../pinot/query/runtime/operator/OpChain.java | 2 + .../spi/accounting/ThreadExecutionContext.java | 2 +- .../spi/accounting/ThreadResourceTracker.java | 2 + .../accounting/ThreadResourceUsageAccountant.java | 23 +++++++- .../java/org/apache/pinot/spi/trace/Tracing.java | 66 +++++++--------------- 9 files changed, 125 insertions(+), 64 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java index 0c2e0daee4..eb7ac33e2b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java @@ -93,9 +93,10 @@ public class CPUMemThreadLevelAccountingObjects { return _currentThreadMemoryAllocationSampleBytes; } + @Nullable public String getQueryId() { TaskEntry taskEntry = _currentThreadTaskStatus.get(); - return taskEntry == null ? "" : taskEntry.getQueryId(); + return taskEntry == null ? null : taskEntry.getQueryId(); } public int getTaskId() { @@ -109,7 +110,7 @@ public class CPUMemThreadLevelAccountingObjects { return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : taskEntry.getTaskType(); } - public void setThreadTaskStatus(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + public void setThreadTaskStatus(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, @Nonnull Thread anchorThread) { _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 4f1fa0087f..2430ce7895 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -49,7 +49,6 @@ import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; -import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +66,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory return new PerQueryCPUMemResourceUsageAccountant(config, instanceId, instanceType); } - public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant { + public static class PerQueryCPUMemResourceUsageAccountant implements ThreadResourceUsageAccountant { /** * MemoryMXBean to get total heap used memory @@ -254,6 +253,22 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } } + @Override + public boolean isAnchorThreadInterrupted() { + ThreadExecutionContext context = _threadLocalEntry.get().getCurrentThreadTaskStatus(); + if (context != null && context.getAnchorThread() != null) { + return context.getAnchorThread().isInterrupted(); + } + + return false; + } + + @Override + @Deprecated + public void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { + } + /** * for testing only */ @@ -310,22 +325,26 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } @Override - public void createExecutionContextInner(@Nullable String queryId, int taskId, - ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { + public void setupRunner(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType) { _threadLocalEntry.get()._errorStatus.set(null); - if (parentContext == null) { - // is anchor thread - assert queryId != null; + if (queryId != null) { _threadLocalEntry.get() .setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread()); - } else { - // not anchor thread - _threadLocalEntry.get().setThreadTaskStatus(queryId, taskId, parentContext.getTaskType(), + } + } + + @Override + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { + _threadLocalEntry.get()._errorStatus.set(null); + if (parentContext != null && parentContext.getQueryId() != null && parentContext.getAnchorThread() != null) { + _threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getTaskType(), parentContext.getAnchorThread()); } } @Override + @Nullable public ThreadExecutionContext getThreadExecutionContext() { return _threadLocalEntry.get().getCurrentThreadTaskStatus(); } @@ -345,8 +364,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory threadEntry.setToIdle(); // clear threadResourceUsageProvider _threadResourceUsageProvider.remove(); - // clear _anchorThread - super.clear(); } @Override diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java index 5f08d73a36..461376b714 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java @@ -244,7 +244,7 @@ public class OfflineGRPCServerIntegrationTest extends BaseClusterIntegrationTest if (responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) { // verify the returned data table metadata only contains "responseSerializationCpuTimeNs". Map<String, String> metadata = dataTable.getMetadata(); - assertTrue(metadata.size() == 1 && metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); + assertTrue(metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); assertNotNull(dataTable.getDataSchema()); numTotalDocs += dataTable.getNumberOfRows(); } else { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java new file mode 100644 index 0000000000..2ffe028f9c --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; + + +public class OfflineGRPCServerOOMAccountingIntegrationTest extends OfflineGRPCServerIntegrationTest { + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty( + CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, + "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory"); + serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true); + serverConf.setProperty( + CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, true); + serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + + CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, true); + } + + protected void overrideBrokerConf(PinotConfiguration serverConf) { + overrideServerConf(serverConf); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index cd2210a0f0..3e9651ac74 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.operator; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.slf4j.Logger; @@ -63,6 +64,7 @@ public class OpChain implements AutoCloseable { return _root; } + @Nullable public ThreadExecutionContext getParentContext() { return _parentContext; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java index 7ea59ad378..320f187605 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java @@ -28,7 +28,7 @@ public interface ThreadExecutionContext { * MSE: Multi Stage Engine * UNKNOWN: Default */ - public enum TaskType { + enum TaskType { SSE, MSE, UNKNOWN diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java index 418210b376..d084bf40d4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.accounting; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import javax.annotation.Nullable; /** @@ -42,6 +43,7 @@ public interface ThreadResourceTracker { * QueryId of the task the thread is executing. * @return a string containing the query id. */ + @Nullable String getQueryId(); /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index a3d1061dd6..6a33ac3ac0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -37,17 +37,34 @@ public interface ThreadResourceUsageAccountant { boolean isAnchorThreadInterrupted(); /** - * Task tracking info + * This method has been deprecated and replaced by {@link setupRunner(String, int, ThreadExecutionContext.TaskType)} + * and {@link setupWorker(int, ThreadExecutionContext.TaskType, ThreadExecutionContext)}. + */ + @Deprecated + void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext); + + /** + * Set up the thread execution context for an anchor a.k.a runner thread. * @param queryId query id string * @param taskId a unique task id - * @param parentContext the parent execution context, null for root(runner) thread + * @param taskType the type of the task - SSE or MSE */ - void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType taskType); + + /** + * Set up the thread execution context for a worker thread. + * @param taskId a unique task id + * @param taskType the type of the task - SSE or MSE + * @param parentContext the parent execution context + */ + void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext); /** * get the executon context of current thread */ + @Nullable ThreadExecutionContext getThreadExecutionContext(); /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 964a64bad1..198d083c57 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; @@ -172,22 +171,23 @@ public class Tracing { */ public static class DefaultThreadResourceUsageAccountant implements ThreadResourceUsageAccountant { - // worker thread's corresponding anchor thread, worker will also interrupt if it finds anchor's flag is raised - private final ThreadLocal<Thread> _anchorThread; - - public DefaultThreadResourceUsageAccountant() { - _anchorThread = new ThreadLocal<>(); + @Override + public boolean isAnchorThreadInterrupted() { + return false; } @Override - public boolean isAnchorThreadInterrupted() { - Thread thread = _anchorThread.get(); - return thread != null && thread.isInterrupted(); + public void createExecutionContext(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { + } + + @Deprecated + public void createExecutionContextInner(@Nullable String queryId, int taskId, + ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { } @Override public void clear() { - _anchorThread.remove(); } @Override @@ -207,34 +207,18 @@ public class Tracing { } @Override - public final void createExecutionContext(@Nullable String queryId, int taskId, - ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { - _anchorThread.set(parentContext == null ? Thread.currentThread() : parentContext.getAnchorThread()); - createExecutionContextInner(queryId, taskId, taskType, parentContext); + public void setupRunner(@Nullable String queryId, int taskId, ThreadExecutionContext.TaskType taskType) { } - public void createExecutionContextInner(@Nullable String queryId, int taskId, - ThreadExecutionContext.TaskType taskType, @Nullable ThreadExecutionContext parentContext) { + @Override + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { } @Override + @Nullable public ThreadExecutionContext getThreadExecutionContext() { - return new ThreadExecutionContext() { - @Override - public String getQueryId() { - return null; - } - - @Override - public Thread getAnchorThread() { - return _anchorThread.get(); - } - - @Override - public TaskType getTaskType() { - return TaskType.UNKNOWN; - } - }; + return null; } @Override @@ -267,14 +251,13 @@ public class Tracing { private ThreadAccountantOps() { } - public static void setupRunner(@Nonnull String queryId) { + public static void setupRunner(String queryId) { setupRunner(queryId, ThreadExecutionContext.TaskType.SSE); } - public static void setupRunner(@Nonnull String queryId, ThreadExecutionContext.TaskType taskType) { + public static void setupRunner(String queryId, ThreadExecutionContext.TaskType taskType) { Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); - Tracing.getThreadAccountant() - .createExecutionContext(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, null); + Tracing.getThreadAccountant().setupRunner(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType); } /** @@ -292,16 +275,9 @@ public class Tracing { * @param threadExecutionContext Context holds metadata about the query. */ public static void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, - ThreadExecutionContext threadExecutionContext) { + @Nullable ThreadExecutionContext threadExecutionContext) { Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider()); - String queryId = null; - if (threadExecutionContext != null) { - queryId = threadExecutionContext.getQueryId(); - } else { - LOGGER.warn("Request ID not available. ParentContext not set for query worker thread."); - } - Tracing.getThreadAccountant() - .createExecutionContext(queryId, taskId, taskType, threadExecutionContext); + Tracing.getThreadAccountant().setupWorker(taskId, taskType, threadExecutionContext); } public static void sample() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org