This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 75b7b6fcbf Improvements to OOM protection for Multi-Stage Engine 
(#13955)
75b7b6fcbf is described below

commit 75b7b6fcbfd0ba92f62a2b3469f18e1550013024
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Fri Sep 20 18:17:22 2024 +0530

    Improvements to OOM protection for Multi-Stage Engine (#13955)
    
    * Add pinot.query.scheduler.accounting.enable.thread.sampling.mse.debug 
config option
    * Fixed some cases where query id were not set in workers
---
 .../CPUMemThreadLevelAccountingObjects.java        |   2 +-
 .../PerQueryCPUMemAccountantFactory.java           |  33 +++-
 .../LeafStageTransferableBlockOperator.java        |  11 ++
 .../query/runtime/operator/MultiStageOperator.java |   2 +-
 .../apache/pinot/query/QueryServerEnclosure.java   |   5 +-
 .../runtime/queries/QueryRunnerAccountingTest.java | 183 +++++++++++++++++++++
 .../query/runtime/queries/QueryRunnerTestBase.java |  11 +-
 .../accounting/ThreadResourceUsageAccountant.java  |   5 +
 .../java/org/apache/pinot/spi/trace/Tracing.java   |  32 +++-
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 10 files changed, 269 insertions(+), 19 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 378fcd991f..0c2e0daee4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -109,7 +109,7 @@ public class CPUMemThreadLevelAccountingObjects {
       return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : 
taskEntry.getTaskType();
     }
 
-    public void setThreadTaskStatus(@Nonnull String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+    public void setThreadTaskStatus(@Nullable String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
         @Nonnull Thread anchorThread) {
       _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, 
anchorThread));
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index fef0c3beeb..f8a5a71ea7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -124,6 +124,9 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     // track memory usage
     private final boolean _isThreadMemorySamplingEnabled;
 
+    // is sampling allowed for MSE queries
+    private final boolean _isThreadSamplingEnabledForMSE;
+
     private final Set<String> _inactiveQuery;
 
     // the periodical task that aggregates and preempts queries
@@ -157,6 +160,11 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       LOGGER.info("_isThreadCPUSamplingEnabled: {}, 
_isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
           _isThreadMemorySamplingEnabled);
 
+      _isThreadSamplingEnabledForMSE =
+          
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
+              CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
+      LOGGER.info("_isThreadSamplingEnabledForMSE: {}", 
_isThreadSamplingEnabledForMSE);
+
       // ThreadMXBean wrapper
       _threadResourceUsageProvider = new ThreadLocal<>();
 
@@ -231,6 +239,17 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       sampleThreadCPUTime();
     }
 
+    /**
+     * Sample Usage for Multi-stage engine queries
+     */
+    @Override
+    public void sampleUsageMSE() {
+      if (_isThreadSamplingEnabledForMSE) {
+        sampleThreadBytesAllocated();
+        sampleThreadCPUTime();
+      }
+    }
+
     /**
      * for testing only
      */
@@ -286,17 +305,17 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     }
 
     @Override
-    public void createExecutionContextInner(@Nullable String queryId, int 
taskId, @Nullable
-        ThreadExecutionContext parentContext) {
+    public void createExecutionContextInner(@Nullable String queryId, int 
taskId,
+        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
       _threadLocalEntry.get()._errorStatus.set(null);
       if (parentContext == null) {
         // is anchor thread
         assert queryId != null;
-        _threadLocalEntry.get().setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID,
-            ThreadExecutionContext.TaskType.UNKNOWN, Thread.currentThread());
+        _threadLocalEntry.get()
+            .setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread());
       } else {
         // not anchor thread
-        
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getTaskType(),
+        _threadLocalEntry.get().setThreadTaskStatus(queryId, taskId, 
parentContext.getTaskType(),
             parentContext.getAnchorThread());
       }
     }
@@ -306,6 +325,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       return _threadLocalEntry.get().getCurrentThreadTaskStatus();
     }
 
+    public CPUMemThreadLevelAccountingObjects.ThreadEntry getThreadEntry() {
+      return _threadLocalEntry.get();
+    }
+
     /**
      * clears thread accounting info once a runner/worker thread has finished 
a particular run
      */
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 7a729460b1..5a433ba370 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -57,6 +57,9 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.trace.Tracing;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -270,10 +273,14 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
   private Future<Void> startExecution() {
     ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
     ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+    ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
     return _executorService.submit(() -> {
       try {
         if (_requests.size() == 1) {
           ServerQueryRequest request = _requests.get(0);
+          ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+          Tracing.ThreadAccountantOps.setupWorker(1, 
threadResourceUsageProvider, parentContext);
+
           InstanceResponseBlock instanceResponseBlock =
               _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
           if (queryLogger != null) {
@@ -303,7 +310,11 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
           CountDownLatch latch = new CountDownLatch(2);
           for (int i = 0; i < 2; i++) {
             ServerQueryRequest request = _requests.get(i);
+            int taskId = i;
             futures[i] = _executorService.submit(() -> {
+              ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
+              Tracing.ThreadAccountantOps.setupWorker(taskId, 
threadResourceUsageProvider, parentContext);
+
               try {
                 InstanceResponseBlock instanceResponseBlock =
                     _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 2690a5a7f7..16d2af4610 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -69,7 +69,7 @@ public abstract class MultiStageOperator
   // Samples resource usage of the operator. The operator should call this 
function for every block of data or
   // assuming the block holds 10000 rows or more.
   protected void sampleAndCheckInterruption() {
-    Tracing.ThreadAccountantOps.sample();
+    Tracing.ThreadAccountantOps.sampleMSE();
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
       earlyTerminate();
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 1d2d81993c..437fee82fa 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -32,6 +32,7 @@ import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
 import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -115,9 +116,9 @@ public class QueryServerEnclosure {
   }
 
   public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan,
-      Map<String, String> requestMetadataMap) {
+      Map<String, String> requestMetadataMap, ThreadExecutionContext 
parentContext) {
     return CompletableFuture.runAsync(
-        () -> _queryRunner.processQuery(workerMetadata, stagePlan, 
requestMetadataMap, null),
+        () -> _queryRunner.processQuery(workerMetadata, stagePlan, 
requestMetadataMap, parentContext),
         _queryRunner.getExecutorService());
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
new file mode 100644
index 0000000000..3aa6556b1b
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.queries;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.QueryServerEnclosure;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class QueryRunnerAccountingTest extends QueryRunnerTestBase {
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    MockInstanceDataManagerFactory factory1 = new 
MockInstanceDataManagerFactory("server1");
+    
factory1.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("a").build(),
 "a_REALTIME");
+    factory1.addSegment("a_REALTIME", QueryRunnerTest.buildRows("a_REALTIME"));
+
+    MockInstanceDataManagerFactory factory2 = new 
MockInstanceDataManagerFactory("server2");
+    
factory2.registerTable(QueryRunnerTest.SCHEMA_BUILDER.setSchemaName("b").build(),
 "b_REALTIME");
+    factory2.addSegment("b_REALTIME", QueryRunnerTest.buildRows("b_REALTIME"));
+
+    _reducerHostname = "localhost";
+    _reducerPort = QueryTestUtils.getAvailablePort();
+    Map<String, Object> reducerConfig = new HashMap<>();
+    
reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
 _reducerHostname);
+    
reducerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 _reducerPort);
+    _mailboxService = new MailboxService(_reducerHostname, _reducerPort, new 
PinotConfiguration(reducerConfig));
+    _mailboxService.start();
+
+    QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
+    server1.start();
+    // Start server1 to ensure the next server will have a different port.
+    QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
+    server2.start();
+    // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
+    // this is only use for test identifier purpose.
+    int port1 = server1.getPort();
+    int port2 = server2.getPort();
+    _servers.put(new QueryServerInstance("localhost", port1, port1), server1);
+    _servers.put(new QueryServerInstance("localhost", port2, port2), server2);
+
+    _queryEnvironment = 
QueryEnvironmentTestBase.getQueryEnvironment(_reducerPort, server1.getPort(), 
server2.getPort(),
+        factory1.getRegisteredSchemaMap(), 
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+        null);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (QueryServerEnclosure server : _servers.values()) {
+      server.shutDown();
+    }
+    _mailboxService.shutdown();
+  }
+
+  @Test
+  void testWithDefaultThreadAccountant() {
+    Tracing.DefaultThreadResourceUsageAccountant accountant = new 
Tracing.DefaultThreadResourceUsageAccountant();
+    try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
+      tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
+
+      ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", 
false).getResultTable();
+      Assert.assertEquals(resultTable.getRows().size(), 2);
+
+      ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+      Assert.assertTrue(threadAccountant.getThreadResources().isEmpty());
+      Assert.assertTrue(threadAccountant.getQueryResources().isEmpty());
+    }
+  }
+
+  @Test
+  void testWithPerQueryAccountantFactory() {
+    HashMap<String, Object> configs = getAccountingConfig();
+
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(configs),
+            "testWithPerQueryAccountantFactory");
+
+    try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
+      tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
+
+      ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", 
false).getResultTable();
+      Assert.assertEquals(resultTable.getRows().size(), 2);
+
+      Map<String, ? extends QueryResourceTracker> resources = 
accountant.getQueryResources();
+      Assert.assertEquals(resources.size(), 1);
+      
Assert.assertTrue(resources.entrySet().iterator().next().getValue().getAllocatedBytes()
 > 0);
+    }
+  }
+
+  @Test
+  void testDisableSamplingForMSE() {
+    HashMap<String, Object> configs = getAccountingConfig();
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, 
false);
+
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(configs),
+            "testWithPerQueryAccountantFactory");
+
+    try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
+      tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
+      ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", 
false).getResultTable();
+      Assert.assertEquals(resultTable.getRows().size(), 2);
+
+      Map<String, ? extends QueryResourceTracker> resources = 
accountant.getQueryResources();
+      Assert.assertEquals(resources.size(), 1);
+      
Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(),
 0);
+    }
+  }
+
+  public static class InterruptingAccountant
+      extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
+
+    public InterruptingAccountant(PinotConfiguration config, String 
instanceId) {
+      super(config, instanceId);
+    }
+
+    @Override
+    public boolean isAnchorThreadInterrupted() {
+      return true;
+    }
+  }
+
+  @Test(expectedExceptions = EarlyTerminationException.class)
+  void testInterrupt() {
+    HashMap<String, Object> configs = getAccountingConfig();
+
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    InterruptingAccountant accountant =
+        new InterruptingAccountant(new PinotConfiguration(configs), 
"testWithPerQueryAccountantFactory");
+
+    try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
+      tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
+      queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
+    }
+  }
+
+  private static HashMap<String, Object> getAccountingConfig() {
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
+    return configs;
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index de65466725..e6d6d43424 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -61,9 +61,11 @@ import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.StringUtil;
@@ -133,7 +135,8 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
     List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
     for (int stageId = 0; stageId < stagePlans.size(); stageId++) {
       if (stageId != 0) {
-        
submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, 
stageId, requestMetadataMap));
+        
submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, 
requestId, stageId,
+            requestMetadataMap));
       }
     }
     try {
@@ -155,20 +158,22 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
   }
 
   protected List<CompletableFuture<?>> 
processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
-      int stageId, Map<String, String> requestMetadataMap) {
+      long requestId, int stageId, Map<String, String> requestMetadataMap) {
     DispatchablePlanFragment dispatchableStagePlan = 
dispatchableSubPlan.getQueryStageList().get(stageId);
     List<WorkerMetadata> stageWorkerMetadataList = 
dispatchableStagePlan.getWorkerMetadataList();
     List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
     for (Map.Entry<QueryServerInstance, List<Integer>> entry : 
dispatchableStagePlan.getServerInstanceToWorkerIdMap()
         .entrySet()) {
       QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
+      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
+      ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
       List<WorkerMetadata> workerMetadataList =
           
entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
       StageMetadata stageMetadata =
           new StageMetadata(stageId, workerMetadataList, 
dispatchableStagePlan.getCustomProperties());
       StagePlan stagePlan = new 
StagePlan(dispatchableStagePlan.getPlanFragment().getFragmentRoot(), 
stageMetadata);
       for (WorkerMetadata workerMetadata : workerMetadataList) {
-        submissionStubs.add(serverEnclosure.processQuery(workerMetadata, 
stagePlan, requestMetadataMap));
+        submissionStubs.add(serverEnclosure.processQuery(workerMetadata, 
stagePlan, requestMetadataMap, parentContext));
       }
     }
     return submissionStubs;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index bab51cf21c..a3d1061dd6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -60,6 +60,11 @@ public interface ThreadResourceUsageAccountant {
    */
   void sampleUsage();
 
+  /**
+   * Sample Usage for Multi-stage engine queries
+   */
+  void sampleUsageMSE();
+
   /**
    * special interface to aggregate usage to the stats store only once, it is 
used for response
    * ser/de threads where the thread execution context cannot be setup before 
hands as
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 59ad65eef5..1f43d5e0d7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
@@ -195,18 +197,23 @@ public class Tracing {
     public void sampleUsage() {
     }
 
+    @Override
+    public void sampleUsageMSE() {
+    }
+
     @Override
     public void updateQueryUsageConcurrently(String queryId) {
     }
 
     @Override
-    public final void createExecutionContext(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
-        ThreadExecutionContext parentContext) {
+    public final void createExecutionContext(@Nullable String queryId, int 
taskId,
+        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
       _anchorThread.set(parentContext == null ? Thread.currentThread() : 
parentContext.getAnchorThread());
-      createExecutionContextInner(queryId, taskId, parentContext);
+      createExecutionContextInner(queryId, taskId, taskType, parentContext);
     }
 
-    public void createExecutionContextInner(String queryId, int taskId, 
ThreadExecutionContext parentContext) {
+    public void createExecutionContextInner(@Nullable String queryId, int 
taskId,
+        ThreadExecutionContext.TaskType taskType, @Nullable 
ThreadExecutionContext parentContext) {
     }
 
     @Override
@@ -259,11 +266,11 @@ public class Tracing {
     private ThreadAccountantOps() {
     }
 
-    public static void setupRunner(String queryId) {
+    public static void setupRunner(@Nonnull String queryId) {
       setupRunner(queryId, ThreadExecutionContext.TaskType.SSE);
     }
 
-    public static void setupRunner(String queryId, 
ThreadExecutionContext.TaskType taskType) {
+    public static void setupRunner(@Nonnull String queryId, 
ThreadExecutionContext.TaskType taskType) {
       Tracing.getThreadAccountant().setThreadResourceUsageProvider(new 
ThreadResourceUsageProvider());
       Tracing.getThreadAccountant()
           .createExecutionContext(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, null);
@@ -290,13 +297,24 @@ public class Tracing {
         ThreadResourceUsageProvider threadResourceUsageProvider,
         ThreadExecutionContext threadExecutionContext) {
       
Tracing.getThreadAccountant().setThreadResourceUsageProvider(threadResourceUsageProvider);
-      Tracing.getThreadAccountant().createExecutionContext(null, taskId, 
taskType, threadExecutionContext);
+      String queryId = null;
+      if (threadExecutionContext != null) {
+        queryId = threadExecutionContext.getQueryId();
+      } else {
+        LOGGER.warn("Request ID not available. ParentContext not set for query 
worker thread.");
+      }
+      Tracing.getThreadAccountant()
+          .createExecutionContext(queryId, taskId, taskType, 
threadExecutionContext);
     }
 
     public static void sample() {
       Tracing.getThreadAccountant().sampleUsage();
     }
 
+    public static void sampleMSE() {
+      Tracing.getThreadAccountant().sampleUsageMSE();
+    }
+
     public static void clear() {
       Tracing.getThreadAccountant().clear();
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 58905ba71f..f3ea4ecba4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -979,6 +979,10 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = 
"accounting.query.killed.metric.enabled";
     public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
+
+    public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE =
+        "accounting.enable.thread.sampling.mse.debug";
+    public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true;
   }
 
   public static class ExecutorService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to