This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c8eb53a4d3 Make OOM Protection work with GRPC Queries (#16004)
c8eb53a4d3 is described below

commit c8eb53a4d3823700956badc29eac4c6ed81e0b7d
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Wed Jun 11 16:53:45 2025 +0530

    Make OOM Protection work with GRPC Queries (#16004)
---
 .../CPUMemThreadLevelAccountingObjects.java        |  5 +-
 .../PerQueryCPUMemAccountantFactory.java           | 41 ++++++++++----
 .../tests/OfflineGRPCServerIntegrationTest.java    |  2 +-
 ...lineGRPCServerOOMAccountingIntegrationTest.java | 46 +++++++++++++++
 .../pinot/query/runtime/operator/OpChain.java      |  2 +
 .../spi/accounting/ThreadExecutionContext.java     |  2 +-
 .../spi/accounting/ThreadResourceTracker.java      |  2 +
 .../accounting/ThreadResourceUsageAccountant.java  | 23 +++++++-
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 66 +++++++---------------
 9 files changed, 125 insertions(+), 64 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 0c2e0daee4..eb7ac33e2b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -93,9 +93,10 @@ public class CPUMemThreadLevelAccountingObjects {
       return _currentThreadMemoryAllocationSampleBytes;
     }
 
+    @Nullable
     public String getQueryId() {
       TaskEntry taskEntry = _currentThreadTaskStatus.get();
-      return taskEntry == null ? "" : taskEntry.getQueryId();
+      return taskEntry == null ? null : taskEntry.getQueryId();
     }
 
     public int getTaskId() {
@@ -109,7 +110,7 @@ public class CPUMemThreadLevelAccountingObjects {
       return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : 
taskEntry.getTaskType();
     }
 
-    public void setThreadTaskStatus(@Nullable String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+    public void setThreadTaskStatus(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
         @Nonnull Thread anchorThread) {
       _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, 
anchorThread));
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 4f1fa0087f..2430ce7895 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -49,7 +49,6 @@ import 
org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +66,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     return new PerQueryCPUMemResourceUsageAccountant(config, instanceId, 
instanceType);
   }
 
-  public static class PerQueryCPUMemResourceUsageAccountant extends 
Tracing.DefaultThreadResourceUsageAccountant {
+  public static class PerQueryCPUMemResourceUsageAccountant implements 
ThreadResourceUsageAccountant {
 
     /**
      * MemoryMXBean to get total heap used memory
@@ -254,6 +253,22 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       }
     }
 
+    @Override
+    public boolean isAnchorThreadInterrupted() {
+      ThreadExecutionContext context = 
_threadLocalEntry.get().getCurrentThreadTaskStatus();
+      if (context != null && context.getAnchorThread() != null) {
+        return context.getAnchorThread().isInterrupted();
+      }
+
+      return false;
+    }
+
+    @Override
+    @Deprecated
+    public void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+        @Nullable ThreadExecutionContext parentContext) {
+    }
+
     /**
      * for testing only
      */
@@ -310,22 +325,26 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     }
 
     @Override
-    public void createExecutionContextInner(@Nullable String queryId, int 
taskId,
-        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
+    public void setupRunner(@Nullable String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType) {
       _threadLocalEntry.get()._errorStatus.set(null);
-      if (parentContext == null) {
-        // is anchor thread
-        assert queryId != null;
+      if (queryId != null) {
         _threadLocalEntry.get()
             .setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread());
-      } else {
-        // not anchor thread
-        _threadLocalEntry.get().setThreadTaskStatus(queryId, taskId, 
parentContext.getTaskType(),
+      }
+    }
+
+    @Override
+    public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext parentContext) {
+      _threadLocalEntry.get()._errorStatus.set(null);
+      if (parentContext != null && parentContext.getQueryId() != null && 
parentContext.getAnchorThread() != null) {
+        
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getTaskType(),
             parentContext.getAnchorThread());
       }
     }
 
     @Override
+    @Nullable
     public ThreadExecutionContext getThreadExecutionContext() {
       return _threadLocalEntry.get().getCurrentThreadTaskStatus();
     }
@@ -345,8 +364,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       threadEntry.setToIdle();
       // clear threadResourceUsageProvider
       _threadResourceUsageProvider.remove();
-      // clear _anchorThread
-      super.clear();
     }
 
     @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
index 5f08d73a36..461376b714 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerIntegrationTest.java
@@ -244,7 +244,7 @@ public class OfflineGRPCServerIntegrationTest extends 
BaseClusterIntegrationTest
       if 
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
         // verify the returned data table metadata only contains 
"responseSerializationCpuTimeNs".
         Map<String, String> metadata = dataTable.getMetadata();
-        assertTrue(metadata.size() == 1 && 
metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
+        
assertTrue(metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
         assertNotNull(dataTable.getDataSchema());
         numTotalDocs += dataTable.getNumberOfRows();
       } else {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
new file mode 100644
index 0000000000..2ffe028f9c
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineGRPCServerOOMAccountingIntegrationTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class OfflineGRPCServerOOMAccountingIntegrationTest extends 
OfflineGRPCServerIntegrationTest {
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(
+        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    serverConf.setProperty(
+        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+            + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
true);
+    serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
+    
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+    
serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
 true);
+    serverConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, 
true);
+  }
+
+  protected void overrideBrokerConf(PinotConfiguration serverConf) {
+    overrideServerConf(serverConf);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index cd2210a0f0..3e9651ac74 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class OpChain implements AutoCloseable {
     return _root;
   }
 
+  @Nullable
   public ThreadExecutionContext getParentContext() {
     return _parentContext;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
index 7ea59ad378..320f187605 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
@@ -28,7 +28,7 @@ public interface ThreadExecutionContext {
     * MSE: Multi Stage Engine
     * UNKNOWN: Default
     */
-   public enum TaskType {
+   enum TaskType {
       SSE,
       MSE,
       UNKNOWN
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
index 418210b376..d084bf40d4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.accounting;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import javax.annotation.Nullable;
 
 
 /**
@@ -42,6 +43,7 @@ public interface ThreadResourceTracker {
    * QueryId of the task the thread is executing.
    * @return a string containing the query id.
    */
+  @Nullable
   String getQueryId();
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index a3d1061dd6..6a33ac3ac0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -37,17 +37,34 @@ public interface ThreadResourceUsageAccountant {
   boolean isAnchorThreadInterrupted();
 
   /**
-   * Task tracking info
+   * This method has been deprecated and replaced by {@link 
setupRunner(String, int, ThreadExecutionContext.TaskType)}
+   * and {@link setupWorker(int, ThreadExecutionContext.TaskType, 
ThreadExecutionContext)}.
+   */
+  @Deprecated
+  void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+      @Nullable ThreadExecutionContext parentContext);
+
+  /**
+   * Set up the thread execution context for an anchor a.k.a runner thread.
    * @param queryId query id string
    * @param taskId a unique task id
-   * @param parentContext the parent execution context, null for root(runner) 
thread
+   * @param taskType the type of the task - SSE or MSE
    */
-  void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+  void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType 
taskType);
+
+  /**
+   * Set up the thread execution context for a worker thread.
+   * @param taskId a unique task id
+   * @param taskType the type of the task - SSE or MSE
+   * @param parentContext the parent execution context
+   */
+  void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
       @Nullable ThreadExecutionContext parentContext);
 
   /**
    * get the executon context of current thread
    */
+  @Nullable
   ThreadExecutionContext getThreadExecutionContext();
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 964a64bad1..198d083c57 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
@@ -172,22 +171,23 @@ public class Tracing {
    */
   public static class DefaultThreadResourceUsageAccountant implements 
ThreadResourceUsageAccountant {
 
-    // worker thread's corresponding anchor thread, worker will also interrupt 
if it finds anchor's flag is raised
-    private final ThreadLocal<Thread> _anchorThread;
-
-    public DefaultThreadResourceUsageAccountant() {
-      _anchorThread = new ThreadLocal<>();
+    @Override
+    public boolean isAnchorThreadInterrupted() {
+      return false;
     }
 
     @Override
-    public boolean isAnchorThreadInterrupted() {
-      Thread thread = _anchorThread.get();
-      return thread != null && thread.isInterrupted();
+    public void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+        @Nullable ThreadExecutionContext parentContext) {
+    }
+
+    @Deprecated
+    public void createExecutionContextInner(@Nullable String queryId, int 
taskId,
+        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
     }
 
     @Override
     public void clear() {
-      _anchorThread.remove();
     }
 
     @Override
@@ -207,34 +207,18 @@ public class Tracing {
     }
 
     @Override
-    public final void createExecutionContext(@Nullable String queryId, int 
taskId,
-        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
-      _anchorThread.set(parentContext == null ? Thread.currentThread() : 
parentContext.getAnchorThread());
-      createExecutionContextInner(queryId, taskId, taskType, parentContext);
+    public void setupRunner(@Nullable String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType) {
     }
 
-    public void createExecutionContextInner(@Nullable String queryId, int 
taskId,
-        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
+    @Override
+    public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext parentContext) {
     }
 
     @Override
+    @Nullable
     public ThreadExecutionContext getThreadExecutionContext() {
-      return new ThreadExecutionContext() {
-        @Override
-        public String getQueryId() {
-          return null;
-        }
-
-        @Override
-        public Thread getAnchorThread() {
-          return _anchorThread.get();
-        }
-
-        @Override
-        public TaskType getTaskType() {
-          return TaskType.UNKNOWN;
-        }
-      };
+      return null;
     }
 
     @Override
@@ -267,14 +251,13 @@ public class Tracing {
     private ThreadAccountantOps() {
     }
 
-    public static void setupRunner(@Nonnull String queryId) {
+    public static void setupRunner(String queryId) {
       setupRunner(queryId, ThreadExecutionContext.TaskType.SSE);
     }
 
-    public static void setupRunner(@Nonnull String queryId, 
ThreadExecutionContext.TaskType taskType) {
+    public static void setupRunner(String queryId, 
ThreadExecutionContext.TaskType taskType) {
       Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
-      Tracing.getThreadAccountant()
-          .createExecutionContext(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, null);
+      Tracing.getThreadAccountant().setupRunner(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType);
     }
 
     /**
@@ -292,16 +275,9 @@ public class Tracing {
      * @param threadExecutionContext Context holds metadata about the query.
      */
     public static void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
-        ThreadExecutionContext threadExecutionContext) {
+        @Nullable ThreadExecutionContext threadExecutionContext) {
       Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
-      String queryId = null;
-      if (threadExecutionContext != null) {
-        queryId = threadExecutionContext.getQueryId();
-      } else {
-        LOGGER.warn("Request ID not available. ParentContext not set for query 
worker thread.");
-      }
-      Tracing.getThreadAccountant()
-          .createExecutionContext(queryId, taskId, taskType, 
threadExecutionContext);
+      Tracing.getThreadAccountant().setupWorker(taskId, taskType, 
threadExecutionContext);
     }
 
     public static void sample() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to