This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 88de1b7eba Set InstanceType for OOM Protection during startup (#15374)
88de1b7eba is described below

commit 88de1b7eba63b15e6ac46d71f3c78dcc396219b2
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Tue Apr 8 05:51:56 2025 +0530

    Set InstanceType for OOM Protection during startup (#15374)
---
 .../pinot/broker/broker/helix/BaseBrokerStarter.java       |  3 ++-
 .../AggregateByQueryIdAccountantFactoryForTest.java        |  9 +++++----
 .../accounting/HeapUsagePublishingAccountantFactory.java   |  3 ++-
 .../core/accounting/PerQueryCPUMemAccountantFactory.java   | 14 +++++++-------
 .../accounting/PerQueryCPUMemAccountantFactoryForTest.java | 10 ++++++----
 .../core/accounting/ResourceManagerAccountingTest.java     |  8 +++++---
 .../tests/OOMProtectionEnabledIntegrationTest.java         |  7 -------
 .../OfflineClusterMemBasedBrokerQueryKillingTest.java      |  3 ---
 .../OfflineClusterMemBasedServerQueryKillingTest.java      |  3 ---
 .../tests/OfflineClusterServerCPUTimeQueryKillingTest.java |  3 ---
 .../integration/tests/WindowResourceAccountingTest.java    |  4 ----
 .../query/runtime/operator/MultiStageAccountingTest.java   |  4 +++-
 .../query/runtime/queries/QueryRunnerAccountingTest.java   | 12 +++++++-----
 .../pinot/server/starter/helix/BaseServerStarter.java      |  3 ++-
 .../pinot/spi/accounting/ThreadAccountantFactory.java      |  3 ++-
 .../src/main/java/org/apache/pinot/spi/trace/Tracing.java  |  6 ++++--
 .../java/org/apache/pinot/spi/utils/CommonConstants.java   |  4 ----
 17 files changed, 45 insertions(+), 54 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 7fbafc6bea..464e7dc48e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -412,7 +412,8 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
         
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
             
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
     Tracing.ThreadAccountantOps.initializeThreadAccountant(
-        _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId);
+        _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
+        org.apache.pinot.spi.config.instance.InstanceType.BROKER);
 
     String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
     if (controllerUrl != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java
index 2a74362494..06c96169b8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java
@@ -25,13 +25,14 @@ import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 public class AggregateByQueryIdAccountantFactoryForTest implements 
ThreadAccountantFactory {
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId) {
-    return new AggregateByQueryIdAccountant(config, instanceId);
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
+    return new AggregateByQueryIdAccountant(config, instanceId, instanceType);
   }
 
   public static class QueryResourceTrackerImpl implements QueryResourceTracker 
{
@@ -75,8 +76,8 @@ public class AggregateByQueryIdAccountantFactoryForTest 
implements ThreadAccount
       extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
     Map<String, QueryResourceTrackerImpl> _queryMemUsage = new 
ConcurrentHashMap<>();
 
-    public AggregateByQueryIdAccountant(PinotConfiguration config, String 
instanceId) {
-      super(config, instanceId);
+    public AggregateByQueryIdAccountant(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
+      super(config, instanceId, instanceType);
     }
 
     @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
index e29d33d044..7e82550286 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 public class HeapUsagePublishingAccountantFactory implements 
ThreadAccountantFactory {
 
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId) {
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
     int period = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS,
         CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD);
     return new HeapUsagePublishingResourceUsageAccountant(period);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 7052b077f6..4f1fa0087f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -63,8 +63,8 @@ import org.slf4j.LoggerFactory;
 public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory {
 
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId) {
-    return new PerQueryCPUMemResourceUsageAccountant(config, instanceId);
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
+    return new PerQueryCPUMemResourceUsageAccountant(config, instanceId, 
instanceType);
   }
 
   public static class PerQueryCPUMemResourceUsageAccountant extends 
Tracing.DefaultThreadResourceUsageAccountant {
@@ -135,7 +135,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     // instance id of the current instance, for logging purpose
     private final String _instanceId;
 
-    public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, 
String instanceId) {
+    private final InstanceType _instanceType;
+
+    public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, 
String instanceId,
+        InstanceType instanceType) {
 
       LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
       _config = config;
@@ -155,6 +158,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       LOGGER.info("cpuSamplingConfig: {}, memorySamplingConfig: {}",
           cpuSamplingConfig, memorySamplingConfig);
 
+      _instanceType = instanceType;
       _isThreadCPUSamplingEnabled = cpuSamplingConfig && 
threadCpuTimeMeasurementEnabled;
       _isThreadMemorySamplingEnabled = memorySamplingConfig && 
threadMemoryMeasurementEnabled;
       LOGGER.info("_isThreadCPUSamplingEnabled: {}, 
_isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
@@ -644,10 +648,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
               CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
 
-      private final InstanceType _instanceType =
-          
InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
-          
CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString()));
-
       private long _usedBytes;
       private int _sleepTime;
       private int _numQueriesKilledConsecutively = 0;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
index 3e6c9004a8..d20f068a36 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.pinot.core.query.utils.QueryIdUtils;
 import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -30,14 +31,15 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  */
 public class PerQueryCPUMemAccountantFactoryForTest implements 
ThreadAccountantFactory {
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId) {
-    return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, 
instanceId);
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
+    return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, 
instanceId, instanceType);
   }
 
   public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest
       extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
-    public 
PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration 
config, String instanceId) {
-      super(config, instanceId);
+    public 
PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration 
config, String instanceId,
+        InstanceType instanceType) {
+      super(config, instanceId, instanceType);
     }
 
     public void postAggregation(Map<String, AggregatedStats> 
aggregatedUsagePerActiveQuery) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index d75fa73c6d..0e304b9e03 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -60,6 +60,7 @@ import 
org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
@@ -284,7 +285,7 @@ public class ResourceManagerAccountingTest {
     PinotConfiguration config = getConfig(20, 2, configs);
     ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testSelect");
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testSelect", InstanceType.SERVER);
 
     CountDownLatch latch = new CountDownLatch(100);
     AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
@@ -353,7 +354,7 @@ public class ResourceManagerAccountingTest {
     PinotConfiguration config = getConfig(20, 2, configs);
     ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testGroupBy");
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testGroupBy", InstanceType.SERVER);
 
     CountDownLatch latch = new CountDownLatch(100);
     AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
@@ -409,7 +410,8 @@ public class ResourceManagerAccountingTest {
     PinotConfiguration config = getConfig(2, 2, configs);
     ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testJsonIndexExtractMapOOM");
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testJsonIndexExtractMapOOM",
+        InstanceType.SERVER);
 
     Supplier<String> randomJsonValue = () -> {
       Random random = new Random();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
index 2bb9efa851..68c3a8a421 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests;
 
 import java.io.File;
 import java.util.List;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -41,9 +40,6 @@ public class OOMProtectionEnabledIntegrationTest extends 
BaseClusterIntegrationT
     configuration.setProperty(
         CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
         "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
-    configuration.setProperty(
-        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
-        InstanceType.BROKER.toString());
     
configuration.setProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
   }
 
@@ -54,9 +50,6 @@ public class OOMProtectionEnabledIntegrationTest extends 
BaseClusterIntegrationT
     configuration.setProperty(
         CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
         "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
-    configuration.setProperty(
-        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
-        InstanceType.SERVER.toString());
     
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
index 2422d778c9..41a5edaeb8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
@@ -39,7 +39,6 @@ import org.apache.log4j.LogManager;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -155,8 +154,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest 
extends BaseClusterInt
         + 
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.40f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + 
CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, 
0.0025f);
-    brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
-        + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, 
InstanceType.BROKER);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 
1.1f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 70a87dfcad..29de87eec2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -38,7 +38,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -179,8 +178,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
         + 
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.0f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + 
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.60f);
-    brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
-        + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, 
InstanceType.BROKER);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 
1.1f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
index 9854943198..7f3e6a518c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
@@ -38,7 +38,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -178,8 +177,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest 
extends BaseClusterInte
         + 
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.2f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + 
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 1.1f);
-    brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
-        + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, 
InstanceType.BROKER);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
         + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 
1.1f);
     brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "."
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
index f1ec44f794..d511787f8e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
@@ -28,7 +28,6 @@ import 
org.apache.pinot.core.accounting.AggregateByQueryIdAccountantFactoryForTe
 import org.apache.pinot.integration.tests.window.utils.WindowFunnelUtils;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
@@ -66,9 +65,6 @@ public class WindowResourceAccountingTest extends 
BaseClusterIntegrationTest {
   }
 
   protected void overrideBrokerConf(PinotConfiguration brokerConf) {
-    brokerConf.setProperty(
-        CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
-        InstanceType.BROKER);
     brokerConf.setProperty(
         CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
         AggregateByQueryIdAccountantFactoryForTest.class.getCanonicalName());
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index 820b9b0e6b..0aa9f386c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -42,6 +42,7 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -90,7 +91,8 @@ public class MultiStageAccountingTest implements ITest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(new 
PinotConfiguration(configs), "testGroupBy");
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(new 
PinotConfiguration(configs), "testGroupBy",
+        InstanceType.SERVER);
 
     // Setup Thread Context
     Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", 
ThreadExecutionContext.TaskType.MSE);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
index 3c6d809626..28353ead2c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.Tracing;
@@ -112,7 +113,7 @@ public class QueryRunnerAccountingTest extends 
QueryRunnerTestBase {
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
     PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
         new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(configs),
-            "testWithPerQueryAccountantFactory");
+            "testWithPerQueryAccountantFactory", InstanceType.SERVER);
 
     try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
       tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
@@ -134,7 +135,7 @@ public class QueryRunnerAccountingTest extends 
QueryRunnerTestBase {
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
     PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
         new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(configs),
-            "testWithPerQueryAccountantFactory");
+            "testWithPerQueryAccountantFactory", InstanceType.SERVER);
 
     try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
       tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
@@ -150,8 +151,8 @@ public class QueryRunnerAccountingTest extends 
QueryRunnerTestBase {
   public static class InterruptingAccountant
       extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
 
-    public InterruptingAccountant(PinotConfiguration config, String 
instanceId) {
-      super(config, instanceId);
+    public InterruptingAccountant(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
+      super(config, instanceId, instanceType);
     }
 
     @Override
@@ -166,7 +167,8 @@ public class QueryRunnerAccountingTest extends 
QueryRunnerTestBase {
 
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
     InterruptingAccountant accountant =
-        new InterruptingAccountant(new PinotConfiguration(configs), 
"testWithPerQueryAccountantFactory");
+        new InterruptingAccountant(new PinotConfiguration(configs), 
"testWithPerQueryAccountantFactory",
+            InstanceType.SERVER);
 
     try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
       tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 847b1ae282..c832600861 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -678,7 +678,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
     // initialize the thread accountant for query killing
     Tracing.ThreadAccountantOps.initializeThreadAccountant(
-        _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId);
+        _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
+        org.apache.pinot.spi.config.instance.InstanceType.SERVER);
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
index f3755c5569..b208fa6a18 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pinot.spi.accounting;
 
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 public interface ThreadAccountantFactory {
-  ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId);
+  ThreadResourceUsageAccountant init(PinotConfiguration config, String 
instanceId, InstanceType instanceType);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 4b5ea93c79..964a64bad1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -315,7 +316,8 @@ public class Tracing {
       Tracing.getThreadAccountant().clear();
     }
 
-    public static void initializeThreadAccountant(PinotConfiguration config, 
String instanceId) {
+    public static void initializeThreadAccountant(PinotConfiguration config, 
String instanceId,
+        InstanceType instanceType) {
       String factoryName = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
       if (factoryName == null) {
         LOGGER.warn("No thread accountant factory provided, using default 
implementation");
@@ -324,7 +326,7 @@ public class Tracing {
         try {
           ThreadAccountantFactory threadAccountantFactory =
               (ThreadAccountantFactory) 
Class.forName(factoryName).getDeclaredConstructor().newInstance();
-          boolean registered = 
Tracing.register(threadAccountantFactory.init(config, instanceId));
+          boolean registered = 
Tracing.register(threadAccountantFactory.init(config, instanceId, 
instanceType));
           LOGGER.info("Using accountant provided by {}", factoryName);
           if (!registered) {
             LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is 
already registered.", factoryName);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5f6b186f70..bcbcc49681 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.query.QueryThreadContext;
 
 
@@ -1257,9 +1256,6 @@ public class CommonConstants {
     public static final String CONFIG_OF_GC_BACKOFF_COUNT = 
"accounting.gc.backoff.count";
     public static final int DEFAULT_GC_BACKOFF_COUNT = 5;
 
-    public static final String CONFIG_OF_INSTANCE_TYPE = 
"accounting.instance.type";
-    public static final InstanceType DEFAULT_CONFIG_OF_INSTANCE_TYPE = 
InstanceType.SERVER;
-
     public static final String CONFIG_OF_GC_WAIT_TIME_MS = 
"accounting.gc.wait.time.ms";
     public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to