This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b13f61a4 Dynamic PerQueryCPUMemAccountant Config on Servers  (#16219)
29b13f61a4 is described below

commit 29b13f61a4aeeaa3d4b1aabc18dc32bf8d7a0836
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Tue Jul 1 17:09:41 2025 +0530

    Dynamic PerQueryCPUMemAccountant Config on Servers  (#16219)
    
    * Checkpoint
    
    * Register change handler
    
    * Fix bugs. Manually tested
    
    * Checkstyle
    
    * Tests
    
    * Add pre-check that values are default
    
    * Undo typo fix
---
 .../PerQueryCPUMemAccountantFactory.java           | 265 +++++++--------
 .../pinot/core/accounting/QueryMonitorConfig.java  | 369 +++++++++++++++++++++
 .../PerQueryCPUMemAccountantFactoryTest.java       | 301 +++++++++++++++++
 .../server/starter/helix/BaseServerStarter.java    |   5 +
 .../accounting/ThreadResourceUsageAccountant.java  |   6 +
 5 files changed, 799 insertions(+), 147 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index f733e81ba7..aa387807d5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.AbstractMetrics;
 import org.apache.pinot.common.metrics.BrokerGauge;
@@ -47,6 +48,7 @@ import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -87,24 +89,24 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       return thread;
     });
 
-    private final PinotConfiguration _config;
+    protected final PinotConfiguration _config;
 
     // the map to track stats entry for each thread, the entry will 
automatically be added when one calls
     // setThreadResourceUsageProvider on the thread, including but not limited 
to
     // server worker thread, runner thread, broker jetty thread, or broker 
netty thread
-    private final ConcurrentHashMap<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadEntriesMap
+    protected final ConcurrentHashMap<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadEntriesMap
         = new ConcurrentHashMap<>();
 
     // For one time concurrent update of stats. This is to provide stats 
collection for parts that are not
     // performance sensitive and query_id is not known beforehand (e.g. broker 
inbound netty thread)
-    private final ConcurrentHashMap<String, Long> 
_concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
-    private final ConcurrentHashMap<String, Long> 
_concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, Long> 
_concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, Long> 
_concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
 
     // for stats aggregation of finished (worker) threads when the runner is 
still running
-    private final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new 
HashMap<>();
-    private final HashMap<String, Long> _finishedTaskMemStatsAggregator = new 
HashMap<>();
+    protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = 
new HashMap<>();
+    protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = 
new HashMap<>();
 
-    private final ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> 
_threadLocalEntry
+    protected final 
ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadLocalEntry
         = ThreadLocal.withInitial(() -> {
           CPUMemThreadLevelAccountingObjects.ThreadEntry ret =
               new CPUMemThreadLevelAccountingObjects.ThreadEntry();
@@ -115,23 +117,23 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     );
 
     // track thread cpu time
-    private final boolean _isThreadCPUSamplingEnabled;
+    protected final boolean _isThreadCPUSamplingEnabled;
 
     // track memory usage
-    private final boolean _isThreadMemorySamplingEnabled;
+    protected final boolean _isThreadMemorySamplingEnabled;
 
     // is sampling allowed for MSE queries
-    private final boolean _isThreadSamplingEnabledForMSE;
+    protected final boolean _isThreadSamplingEnabledForMSE;
 
-    private final Set<String> _inactiveQuery;
+    protected final Set<String> _inactiveQuery;
 
     // the periodical task that aggregates and preempts queries
-    private final WatcherTask _watcherTask;
+    protected final WatcherTask _watcherTask;
 
     // instance id of the current instance, for logging purpose
-    private final String _instanceId;
+    protected final String _instanceId;
 
-    private final InstanceType _instanceType;
+    protected final InstanceType _instanceType;
 
     public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, 
String instanceId,
         InstanceType instanceType) {
@@ -263,13 +265,6 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         @Nullable ThreadExecutionContext parentContext) {
     }
 
-    /**
-     * for testing only
-     */
-    public int getEntryCount() {
-      return _threadEntriesMap.size();
-    }
-
     @Override
     @Deprecated
     public void setThreadResourceUsageProvider(ThreadResourceUsageProvider 
threadResourceUsageProvider) {
@@ -354,11 +349,20 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       threadEntry.setToIdle();
     }
 
+    public WatcherTask getWatcherTask() {
+      return _watcherTask;
+    }
+
     @Override
     public void startWatcherTask() {
       EXECUTOR_SERVICE.submit(_watcherTask);
     }
 
+    @Override
+    public PinotClusterConfigChangeListener getClusterConfigChangeListener() {
+      return _watcherTask;
+    }
+
     /**
      * remove in active queries from _finishedTaskStatAggregator
      */
@@ -577,95 +581,13 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     /**
      * A watcher task to perform usage sampling, aggregation, and query 
preemption
      */
-    class WatcherTask implements Runnable {
-
-      // max heap usage, Xmx
-      private final long _maxHeapSize = 
MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
-
-      // don't kill a query if its memory footprint is below some ratio of 
_maxHeapSize
-      private final long _minMemoryFootprintForKill = (long) (_maxHeapSize
-          * 
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
-          CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO));
-
-      // kill all queries if heap usage exceeds this
-      private final long _panicLevel = (long) (_maxHeapSize
-          * 
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
-          CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO));
-
-      // kill the most expensive query if heap usage exceeds this
-      private final long _criticalLevel = (long) (_maxHeapSize
-          * 
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
-          CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
-
-      // if after gc the heap usage is still above this, kill the most 
expensive query
-      // use this to prevent heap size oscillation and repeatedly triggering gc
-      private final long _criticalLevelAfterGC = _criticalLevel - (long) 
(_maxHeapSize
-          * 
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
-          
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));
-
-      // trigger gc if consecutively kill more than some number of queries
-      // set this to 0 to always trigger gc before killing a query to give gc 
a second chance
-      // as would minimize the chance of false positive killing in some 
usecases
-      // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for 
some gc algorithms
-      private final int _gcBackoffCount =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
-              CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
-
-      // start to sample more frequently if heap usage exceeds this
-      private final long _alarmingLevel =
-          (long) (_maxHeapSize
-              * 
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
-              
CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO));
-
-      // normal sleep time
-      private final int _normalSleepTime =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
-              CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
-
-      // wait for gc to complete, according to system.gc() javadoc, when 
control returns from the method call,
-      // the Java Virtual Machine has made a best effort to reclaim space from 
all discarded objects.
-      // Therefore, we default this to 0.
-      // Tested with Shenandoah GC and G1GC, with 
-XX:+ExplicitGCInvokesConcurrent
-      private final int _gcWaitTime =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
-              CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
-
-      // alarming sleep time denominator, should be > 1 to sample more 
frequent at alarming level
-      private final int _alarmingSleepTimeDenominator =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR,
-              CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);
-
-      // alarming sleep time
-      private final int _alarmingSleepTime = _normalSleepTime / 
_alarmingSleepTimeDenominator;
-
-      // the framework would not commit to kill any query if this is disabled
-      private final boolean _oomKillQueryEnabled =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
-              
CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY);
-
-      // if we want to publish the heap usage
-      private final boolean _publishHeapUsageMetric =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE,
-              CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE);
-
-      // if we want kill query based on CPU time
-      private final boolean _isCPUTimeBasedKillingEnabled =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED,
-              
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED) && 
_isThreadCPUSamplingEnabled;
-
-      // CPU time based killing threshold
-      private final long _cpuTimeBasedKillingThresholdNS =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS,
-              
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 
1000_000L;
-
-      //
-      private final boolean _isQueryKilledMetricEnabled =
-          
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
-              CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
-
-      private long _usedBytes;
-      private int _sleepTime;
-      private int _numQueriesKilledConsecutively = 0;
+    public class WatcherTask implements Runnable, 
PinotClusterConfigChangeListener {
+
+      protected AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new 
AtomicReference<>();
+
+      protected long _usedBytes;
+      protected int _sleepTime;
+      protected int _numQueriesKilledConsecutively = 0;
       protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
       private TriggeringLevel _triggeringLevel;
 
@@ -677,6 +599,9 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       private final AbstractMetrics.Gauge _memoryUsageGauge;
 
       WatcherTask() {
+        _queryMonitorConfig.set(new QueryMonitorConfig(_config, 
MEMORY_MX_BEAN.getHeapMemoryUsage().getMax()));
+        logQueryMonitorConfig();
+
         switch (_instanceType) {
           case SERVER:
             _metrics = ServerMetrics.get();
@@ -703,29 +628,64 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         }
       }
 
+      public QueryMonitorConfig getQueryMonitorConfig() {
+        return _queryMonitorConfig.get();
+      }
+
       @Override
-      public void run() {
+      public synchronized void onChange(Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
+        // Filter configs that have CommonConstants.PREFIX_SCHEDULER_PREFIX
+        Set<String> filteredChangedConfigs =
+            changedConfigs.stream().filter(config -> 
config.startsWith(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX))
+                .map(config -> 
config.replace(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".", ""))
+                .collect(Collectors.toSet());
+
+        if (filteredChangedConfigs.isEmpty()) {
+          LOGGER.debug("No relevant configs changed, skipping update for 
QueryMonitorConfig.");
+          return;
+        }
+
+        Map<String, String> filteredClusterConfigs = 
clusterConfigs.entrySet().stream()
+            .filter(entry -> 
entry.getKey().startsWith(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX)).collect(
+                Collectors.toMap(
+                    entry -> 
entry.getKey().replace(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".", ""),
+                    Map.Entry::getValue));
+
+        QueryMonitorConfig oldConfig = _queryMonitorConfig.get();
+        QueryMonitorConfig newConfig =
+            new QueryMonitorConfig(oldConfig, filteredChangedConfigs, 
filteredClusterConfigs);
+        _queryMonitorConfig.set(newConfig);
+        logQueryMonitorConfig();
+      }
+
+      private void logQueryMonitorConfig() {
+        QueryMonitorConfig queryMonitorConfig = _queryMonitorConfig.get();
         // Log info for the accountant configs
-        LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant.");
-        LOGGER.info("Xmx is {}", _maxHeapSize);
+        LOGGER.info("Updated Configuration for Query Monitor");
+        LOGGER.info("Xmx is {}", queryMonitorConfig.getMaxHeapSize());
         LOGGER.info("_instanceType is {}", _instanceType);
-        LOGGER.info("_alarmingLevel of on heap memory is {}", _alarmingLevel);
-        LOGGER.info("_criticalLevel of on heap memory is {}", _criticalLevel);
-        LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", 
_criticalLevelAfterGC);
-        LOGGER.info("_panicLevel of on heap memory is {}", _panicLevel);
-        LOGGER.info("_gcBackoffCount is {}", _gcBackoffCount);
-        LOGGER.info("_gcWaitTime is {}", _gcWaitTime);
-        LOGGER.info("_normalSleepTime is {}", _normalSleepTime);
-        LOGGER.info("_alarmingSleepTime is {}", _alarmingSleepTime);
-        LOGGER.info("_oomKillQueryEnabled: {}", _oomKillQueryEnabled);
-        LOGGER.info("_minMemoryFootprintForKill: {}", 
_minMemoryFootprintForKill);
+        LOGGER.info("_alarmingLevel of on heap memory is {}", 
queryMonitorConfig.getAlarmingLevel());
+        LOGGER.info("_criticalLevel of on heap memory is {}", 
queryMonitorConfig.getCriticalLevel());
+        LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", 
queryMonitorConfig.getCriticalLevelAfterGC());
+        LOGGER.info("_panicLevel of on heap memory is {}", 
queryMonitorConfig.getPanicLevel());
+        LOGGER.info("_gcBackoffCount is {}", 
queryMonitorConfig.getGcBackoffCount());
+        LOGGER.info("_gcWaitTime is {}", queryMonitorConfig.getGcWaitTime());
+        LOGGER.info("_normalSleepTime is {}", 
queryMonitorConfig.getNormalSleepTime());
+        LOGGER.info("_alarmingSleepTime is {}", 
queryMonitorConfig.getAlarmingSleepTime());
+        LOGGER.info("_oomKillQueryEnabled: {}", 
queryMonitorConfig.isOomKillQueryEnabled());
+        LOGGER.info("_minMemoryFootprintForKill: {}", 
queryMonitorConfig.getMinMemoryFootprintForKill());
         LOGGER.info("_isCPUTimeBasedKillingEnabled: {}, 
_cpuTimeBasedKillingThresholdNS: {}",
-            _isCPUTimeBasedKillingEnabled, _cpuTimeBasedKillingThresholdNS);
+            queryMonitorConfig.isCpuTimeBasedKillingEnabled(), 
queryMonitorConfig.getCpuTimeBasedKillingThresholdNS());
+      }
 
+      @Override
+      public void run() {
         while (true) {
+          QueryMonitorConfig config = _queryMonitorConfig.get();
+
           LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
           _triggeringLevel = TriggeringLevel.Normal;
-          _sleepTime = _normalSleepTime;
+          _sleepTime = config.getNormalSleepTime();
           _aggregatedUsagePerActiveQuery = null;
           try {
             // Get the metrics used for triggering the kill
@@ -750,7 +710,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
             LOGGER.debug("_threadEntriesMap size: {}", 
_threadEntriesMap.size());
 
             // Publish server heap usage metrics
-            if (_publishHeapUsageMetric) {
+            if (config.isPublishHeapUsageMetric()) {
               _metrics.setValueOfGlobalGauge(_memoryUsageGauge, _usedBytes);
             }
             // Clean inactive query stats
@@ -771,14 +731,15 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
        * @return if panic mode is triggered
        */
       private boolean outOfMemoryPanicTrigger() {
+        long panicLevel = _queryMonitorConfig.get().getPanicLevel();
         // at this point we assume we have tried to kill some queries and the 
gc kicked in
         // we have no choice but to kill all queries
-        if (_usedBytes >= _panicLevel) {
+        if (_usedBytes >= panicLevel) {
           killAllQueries();
           _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
           _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1);
           LOGGER.error("Heap used bytes {}, greater than _panicLevel {}, 
Killed all queries and triggered gc!",
-              _usedBytes, _panicLevel);
+              _usedBytes, panicLevel);
           // call aggregate here as will throw exception and
           aggregate(false);
           return true;
@@ -791,15 +752,17 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
        * Triggers should be mutually exclusive and evaluated following level 
high -> low
        */
       private void evalTriggers() {
-        if (_isCPUTimeBasedKillingEnabled) {
+        QueryMonitorConfig config = _queryMonitorConfig.get();
+
+        if (config.isCpuTimeBasedKillingEnabled()) {
           _triggeringLevel = TriggeringLevel.CPUTimeBasedKilling;
         }
 
-        if (_usedBytes > _criticalLevel) {
+        if (_usedBytes > config.getCriticalLevel()) {
           _triggeringLevel = TriggeringLevel.HeapMemoryCritical;
           _metrics.addMeteredGlobalValue(_heapMemoryCriticalExceededMeter, 1);
-        } else if (_usedBytes > _alarmingLevel) {
-          _sleepTime = _alarmingSleepTime;
+        } else if (_usedBytes > config.getAlarmingLevel()) {
+          _sleepTime = config.getAlarmingSleepTime();
           // For debugging
           _triggeringLevel = (IS_DEBUG_MODE_ENABLED && _triggeringLevel == 
TriggeringLevel.Normal)
               ? TriggeringLevel.HeapMemoryAlarmingVerbose : _triggeringLevel;
@@ -812,7 +775,8 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       private void triggeredActions() {
         switch (_triggeringLevel) {
           case HeapMemoryCritical:
-            LOGGER.warn("Heap used bytes {} exceeds critical level {}", 
_usedBytes, _criticalLevel);
+            LOGGER.warn("Heap used bytes {} exceeds critical level {}", 
_usedBytes,
+                _queryMonitorConfig.get().getCriticalLevel());
             killMostExpensiveQuery();
             break;
           case CPUTimeBasedKilling:
@@ -837,7 +801,9 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       }
 
       void killAllQueries() {
-        if (_oomKillQueryEnabled) {
+        QueryMonitorConfig config = _queryMonitorConfig.get();
+
+        if (config.isOomKillQueryEnabled()) {
           int killedCount = 0;
           for (Map.Entry<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : 
_threadEntriesMap.entrySet()) {
             CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
entry.getValue();
@@ -849,11 +815,11 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
               killedCount += 1;
             }
           }
-          if (_isQueryKilledMetricEnabled) {
+          if (config.isQueryKilledMetricEnabled()) {
             _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
           }
           try {
-            Thread.sleep(_normalSleepTime);
+            Thread.sleep(config.getNormalSleepTime());
           } catch (InterruptedException ignored) {
           }
           // In this extreme case we directly trigger system.gc
@@ -868,19 +834,21 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
        * use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc 
is triggered
        */
       private void killMostExpensiveQuery() {
-        if (!_aggregatedUsagePerActiveQuery.isEmpty() && 
_numQueriesKilledConsecutively >= _gcBackoffCount) {
+        QueryMonitorConfig config = _queryMonitorConfig.get();
+        if (!_aggregatedUsagePerActiveQuery.isEmpty()
+            && _numQueriesKilledConsecutively >= config.getGcBackoffCount()) {
           _numQueriesKilledConsecutively = 0;
           System.gc();
           try {
-            Thread.sleep(_gcWaitTime);
+            Thread.sleep(config.getGcWaitTime());
           } catch (InterruptedException ignored) {
           }
           _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
-          if (_usedBytes < _criticalLevelAfterGC) {
+          if (_usedBytes < config.getCriticalLevelAfterGC()) {
             return;
           }
-          LOGGER.error("After GC, heap used bytes {} still exceeds 
_criticalLevelAfterGC level {}",
-              _usedBytes, _criticalLevelAfterGC);
+          LOGGER.error("After GC, heap used bytes {} still exceeds 
_criticalLevelAfterGC level {}", _usedBytes,
+              config.getCriticalLevelAfterGC());
         }
         if (!(_isThreadMemorySamplingEnabled || _isThreadCPUSamplingEnabled)) {
           LOGGER.warn("But unable to kill query because neither memory nor cpu 
tracking is enabled");
@@ -895,7 +863,8 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         if (_isThreadMemorySamplingEnabled) {
           maxUsageTuple = 
Collections.max(_aggregatedUsagePerActiveQuery.values(),
               Comparator.comparing(AggregatedStats::getAllocatedBytes));
-          boolean shouldKill = _oomKillQueryEnabled && 
maxUsageTuple._allocatedBytes > _minMemoryFootprintForKill;
+          boolean shouldKill = config.isOomKillQueryEnabled()
+              && maxUsageTuple._allocatedBytes > 
config.getMinMemoryFootprintForKill();
           if (shouldKill) {
             maxUsageTuple._exceptionAtomicReference
                 .set(new RuntimeException(String.format(
@@ -903,7 +872,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
                     maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), 
_instanceType, _instanceId)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
             logTerminatedQuery(maxUsageTuple, _usedBytes);
-          } else if (!_oomKillQueryEnabled) {
+          } else if (!config.isOomKillQueryEnabled()) {
             LOGGER.warn("Query {} got picked because using {} bytes of memory, 
actual kill committed false "
                     + "because oomKillQueryEnabled is false",
                 maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
@@ -915,16 +884,18 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       }
 
       private void killCPUTimeExceedQueries() {
+        QueryMonitorConfig config = _queryMonitorConfig.get();
+
         for (Map.Entry<String, AggregatedStats> entry : 
_aggregatedUsagePerActiveQuery.entrySet()) {
           AggregatedStats value = entry.getValue();
-          if (value._cpuNS > _cpuTimeBasedKillingThresholdNS) {
+          if (value._cpuNS > config.getCpuTimeBasedKillingThresholdNS()) {
             LOGGER.error("Current task status recorded is {}. Query {} got 
picked because using {} ns of cpu time,"
                     + " greater than threshold {}", _threadEntriesMap, 
value._queryId, value.getCpuTimeNs(),
-                _cpuTimeBasedKillingThresholdNS);
+                config.getCpuTimeBasedKillingThresholdNS());
             value._exceptionAtomicReference.set(new RuntimeException(
                 String.format("Query %s got killed on %s: %s because using %d "
                         + "CPU time exceeding limit of %d ns CPU time", 
value._queryId, _instanceType, _instanceId,
-                    value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS)));
+                    value.getCpuTimeNs(), 
config.getCpuTimeBasedKillingThresholdNS())));
             interruptRunnerThread(value.getAnchorThread());
             logTerminatedQuery(value, _usedBytes);
           }
@@ -934,7 +905,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
       private void interruptRunnerThread(Thread thread) {
         thread.interrupt();
-        if (_isQueryKilledMetricEnabled) {
+        if (_queryMonitorConfig.get().isQueryKilledMetricEnabled()) {
           _metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
         }
         _numQueriesKilledConsecutively += 1;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
new file mode 100644
index 0000000000..ba4b05aa81
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class QueryMonitorConfig {
+  private final long _maxHeapSize;
+
+  // don't kill a query if its memory footprint is below some ratio of 
_maxHeapSize
+  private final long _minMemoryFootprintForKill;
+
+  // kill all queries if heap usage exceeds this
+  private final long _panicLevel;
+
+  // kill the most expensive query if heap usage exceeds this
+  private final long _criticalLevel;
+
+  // if after gc the heap usage is still above this, kill the most expensive 
query
+  // use this to prevent heap size oscillation and repeatedly triggering gc
+  private final long _criticalLevelAfterGC;
+
+  // trigger gc if consecutively kill more than some number of queries
+  // set this to 0 to always trigger gc before killing a query to give gc a 
second chance
+  // as would minimize the chance of false positive killing in some usecases
+  // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for 
some gc algorithms
+  private final int _gcBackoffCount;
+
+  // start to sample more frequently if heap usage exceeds this
+  private final long _alarmingLevel;
+
+  // normal sleep time
+  private final int _normalSleepTime;
+
+  // wait for gc to complete, according to system.gc() javadoc, when control 
returns from the method call,
+  // the Java Virtual Machine has made a best effort to reclaim space from all 
discarded objects.
+  // Therefore, we default this to 0.
+  // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
+  private final int _gcWaitTime;
+
+  // alarming sleep time denominator, should be > 1 to sample more frequent at 
alarming level
+  private final int _alarmingSleepTimeDenominator;
+
+  // alarming sleep time
+  private final int _alarmingSleepTime;
+
+  // the framework would not commit to kill any query if this is disabled
+  private final boolean _oomKillQueryEnabled;
+
+  // if we want to publish the heap usage
+  private final boolean _publishHeapUsageMetric;
+
+  // if we want kill query based on CPU time
+  private final boolean _isCPUTimeBasedKillingEnabled;
+
+  // CPU time based killing threshold
+  private final long _cpuTimeBasedKillingThresholdNS;
+
+  private final boolean _isQueryKilledMetricEnabled;
+
+  public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) {
+    _maxHeapSize = maxHeapSize;
+
+    _minMemoryFootprintForKill = (long) (maxHeapSize * config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
+        CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO));
+
+    _panicLevel =
+        (long) (maxHeapSize * 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
+            CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO));
+
+    // kill the most expensive query if heap usage exceeds this
+    _criticalLevel =
+        (long) (maxHeapSize * 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
+            
CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
+
+    _criticalLevelAfterGC = _criticalLevel - (long) (maxHeapSize * 
config.getProperty(
+        
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
+        
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));
+
+    _gcBackoffCount = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
+        CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
+
+    _alarmingLevel =
+        (long) (maxHeapSize * 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
+            
CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO));
+
+    _normalSleepTime = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
+        CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
+
+    _gcWaitTime = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
+        CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
+
+    _alarmingSleepTimeDenominator = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR,
+        CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);
+
+    _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator;
+
+    _oomKillQueryEnabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
+        
CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY);
+
+    _publishHeapUsageMetric = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE,
+        CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE);
+
+    _isCPUTimeBasedKillingEnabled =
+        
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED,
+            CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED);
+
+    _cpuTimeBasedKillingThresholdNS =
+        
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS,
+            
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 
1000_000L;
+
+    _isQueryKilledMetricEnabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
+        CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
+  }
+
+  QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, 
Map<String, String> clusterConfigs) {
+    _maxHeapSize = oldConfig._maxHeapSize;
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) {
+        _minMemoryFootprintForKill =
+            (long) (_maxHeapSize * 
CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO);
+      } else {
+        _minMemoryFootprintForKill = (long) (_maxHeapSize * Double.parseDouble(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)));
+      }
+    } else {
+      _minMemoryFootprintForKill = oldConfig._minMemoryFootprintForKill;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)) {
+        _panicLevel = (long) (_maxHeapSize * 
CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO);
+      } else {
+        _panicLevel = (long) (_maxHeapSize * Double.parseDouble(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)));
+      }
+    } else {
+      _panicLevel = oldConfig._panicLevel;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)) {
+        _criticalLevel = (long) (_maxHeapSize * 
CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO);
+      } else {
+        _criticalLevel = (long) (_maxHeapSize * Double.parseDouble(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)));
+      }
+    } else {
+      _criticalLevel = oldConfig._criticalLevel;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC))
 {
+        _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize
+            * 
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC);
+      } else {
+        _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize * 
Double.parseDouble(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)));
+      }
+    } else {
+      _criticalLevelAfterGC = oldConfig._criticalLevelAfterGC;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) {
+        _gcBackoffCount = CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT;
+      } else {
+        _gcBackoffCount = 
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT));
+      }
+    } else {
+      _gcBackoffCount = oldConfig._gcBackoffCount;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) {
+        _alarmingLevel = (long) (_maxHeapSize * 
CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO);
+      } else {
+        _alarmingLevel = (long) (_maxHeapSize * Double.parseDouble(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)));
+      }
+    } else {
+      _alarmingLevel = oldConfig._alarmingLevel;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)) {
+      if (clusterConfigs == null || 
!clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS))
 {
+        _normalSleepTime = CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS;
+      } else {
+        _normalSleepTime = 
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS));
+      }
+    } else {
+      _normalSleepTime = oldConfig._normalSleepTime;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)) 
{
+      if (clusterConfigs == null || 
!clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS))
 {
+        _gcWaitTime = 
CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS;
+      } else {
+        _gcWaitTime = 
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS));
+      }
+    } else {
+      _gcWaitTime = oldConfig._gcWaitTime;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) {
+        _alarmingSleepTimeDenominator = 
CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR;
+      } else {
+        _alarmingSleepTimeDenominator =
+            
Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR));
+      }
+    } else {
+      _alarmingSleepTimeDenominator = oldConfig._alarmingSleepTimeDenominator;
+    }
+
+    _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator;
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)) {
+        _oomKillQueryEnabled = 
CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY;
+      } else {
+        _oomKillQueryEnabled =
+            
Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY));
+      }
+    } else {
+      _oomKillQueryEnabled = oldConfig._oomKillQueryEnabled;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)) {
+        _publishHeapUsageMetric = 
CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE;
+      } else {
+        _publishHeapUsageMetric =
+            
Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE));
+      }
+    } else {
+      _publishHeapUsageMetric = oldConfig._publishHeapUsageMetric;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)) {
+        _isCPUTimeBasedKillingEnabled = 
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED;
+      } else {
+        _isCPUTimeBasedKillingEnabled = Boolean.parseBoolean(
+            
clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED));
+      }
+    } else {
+      _isCPUTimeBasedKillingEnabled = oldConfig._isCPUTimeBasedKillingEnabled;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          
CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)) {
+        _cpuTimeBasedKillingThresholdNS =
+            
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS * 
1000_000L;
+      } else {
+        _cpuTimeBasedKillingThresholdNS =
+            
Long.parseLong(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS))
+                * 1000_000L;
+      }
+    } else {
+      _cpuTimeBasedKillingThresholdNS = 
oldConfig._cpuTimeBasedKillingThresholdNS;
+    }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED))
 {
+      if (clusterConfigs == null || !clusterConfigs.containsKey(
+          CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)) {
+        _isQueryKilledMetricEnabled = 
CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED;
+      } else {
+        _isQueryKilledMetricEnabled =
+            
Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED));
+      }
+    } else {
+      _isQueryKilledMetricEnabled = oldConfig._isQueryKilledMetricEnabled;
+    }
+  }
+
+  public long getMaxHeapSize() {
+    return _maxHeapSize;
+  }
+
+  public long getMinMemoryFootprintForKill() {
+    return _minMemoryFootprintForKill;
+  }
+
+  public long getPanicLevel() {
+    return _panicLevel;
+  }
+
+  public long getCriticalLevel() {
+    return _criticalLevel;
+  }
+
+  public long getCriticalLevelAfterGC() {
+    return _criticalLevelAfterGC;
+  }
+
+  public int getGcBackoffCount() {
+    return _gcBackoffCount;
+  }
+
+  public long getAlarmingLevel() {
+    return _alarmingLevel;
+  }
+
+  public int getNormalSleepTime() {
+    return _normalSleepTime;
+  }
+
+  public int getGcWaitTime() {
+    return _gcWaitTime;
+  }
+
+  public int getAlarmingSleepTime() {
+    return _alarmingSleepTime;
+  }
+
+  public boolean isOomKillQueryEnabled() {
+    return _oomKillQueryEnabled;
+  }
+
+  public boolean isPublishHeapUsageMetric() {
+    return _publishHeapUsageMetric;
+  }
+
+  public boolean isCpuTimeBasedKillingEnabled() {
+    return _isCPUTimeBasedKillingEnabled;
+  }
+
+  public long getCpuTimeBasedKillingThresholdNS() {
+    return _cpuTimeBasedKillingThresholdNS;
+  }
+
+  public boolean isQueryKilledMetricEnabled() {
+    return _isQueryKilledMetricEnabled;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java
new file mode 100644
index 0000000000..8dc68670c6
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class PerQueryCPUMemAccountantFactoryTest {
+  private static final double EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL = 0.05;
+  private static final double EXPECTED_PANIC_LEVEL = 0.9f;
+  private static final double EXPECTED_CRITICAL_LEVEL = 0.95f;
+  private static final double EXPECTED_CRITICAL_LEVEL_AFTER_GC = 0.05f;
+  private static final int EXPECTED_GC_BACKOFF_COUNT = 3;
+  private static final double EXPECTED_ALARMING_LEVEL = 0.8f;
+  private static final int EXPECTED_NORMAL_SLEEP_TIME = 50;
+  private static final int EXPECTED_GC_WAIT_TIME = 1000;
+  private static final int EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR = 2;
+  private static final boolean EXPECTED_OOM_KILL_QUERY_ENABLED = true;
+  private static final boolean EXPECTED_PUBLISH_HEAP_USAGE_METRIC = true;
+  private static final boolean EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED = 
true;
+  private static final long EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS = 
1000;
+  private static final boolean EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED = true;
+  private static final Map<String, String> CLUSTER_CONFIGS = new HashMap<>();
+
+  private static String getFullyQualifiedConfigName(String config) {
+    return CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + config;
+  }
+
+  @BeforeClass
+  public void setUp() {
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY),
+        Boolean.toString(EXPECTED_OOM_KILL_QUERY_ENABLED));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE),
+        Boolean.toString(EXPECTED_PUBLISH_HEAP_USAGE_METRIC));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED),
+        Boolean.toString(EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS),
+        Long.toString(EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO),
+        Double.toString(EXPECTED_PANIC_LEVEL));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO),
+        Double.toString(EXPECTED_CRITICAL_LEVEL));
+    CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(
+            
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC),
+        Double.toString(EXPECTED_CRITICAL_LEVEL_AFTER_GC));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO),
+        Double.toString(EXPECTED_ALARMING_LEVEL));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS),
+        Integer.toString(EXPECTED_NORMAL_SLEEP_TIME));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR),
+        Integer.toString(EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR));
+    CLUSTER_CONFIGS.put(
+        
getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO),
+        Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT),
+        Integer.toString(EXPECTED_GC_BACKOFF_COUNT));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS),
+        Integer.toString(EXPECTED_GC_WAIT_TIME));
+    
CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED),
+        Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED));
+  }
+
+  @Test
+  void testOOMProtectionKillingQueryConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isOomKillQueryEnabled());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)),
+        CLUSTER_CONFIGS);
+    
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isOomKillQueryEnabled());
+  }
+
+  @Test
+  void testPublishHeapUsageMetricConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isPublishHeapUsageMetric());
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)),
+            CLUSTER_CONFIGS);
+    
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isPublishHeapUsageMetric());
+  }
+
+  @Test
+  void testCPUTimeBasedKillingEnabledConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isCpuTimeBasedKillingEnabled());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)),
+        CLUSTER_CONFIGS);
+    
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isCpuTimeBasedKillingEnabled());
+  }
+
+  @Test
+  void testCPUTimeBasedKillingThresholdConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCpuTimeBasedKillingThresholdNS(),
+        CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS 
* 1000_000L);
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)),
+        CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCpuTimeBasedKillingThresholdNS(),
+        EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS * 1000_000L);
+  }
+
+  @Test
+  void testPanicLevelHeapUsageRatioConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getPanicLevel(),
+        CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO * 
accountant.getWatcherTask()
+            .getQueryMonitorConfig().getMaxHeapSize());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)),
+        CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getPanicLevel(),
+        EXPECTED_PANIC_LEVEL * 
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
+  }
+
+  @Test
+  void testCriticalLevelHeapUsageRatioConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel(),
+        CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO * 
accountant.getWatcherTask()
+            .getQueryMonitorConfig().getMaxHeapSize());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)),
+        CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel(),
+        EXPECTED_CRITICAL_LEVEL * 
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
+  }
+
+  @Test
+  void testCriticalLevelHeapUsageRatioDeltaAfterGCConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
+        accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
+            - 
CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC
+            * 
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
+    accountant.getWatcherTask().onChange(Set.of(getFullyQualifiedConfigName(
+        
CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)),
 CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(),
+        accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel()
+            - EXPECTED_CRITICAL_LEVEL_AFTER_GC * 
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
+  }
+
+  @Test
+  void testAlarmingLevelHeapUsageRatioConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(),
+        CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO * 
accountant.getWatcherTask()
+            .getQueryMonitorConfig().getMaxHeapSize());
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)),
+        CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(),
+        EXPECTED_ALARMING_LEVEL * 
accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize());
+  }
+
+  @Test
+  void testSleepTimeConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime(),
+        CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)),
+            CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime(),
 EXPECTED_NORMAL_SLEEP_TIME);
+  }
+
+  @Test
+  void testSleepTimeDenominatorConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingSleepTime(),
+        
accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime()
+            / CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)),
+            CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingSleepTime(),
+        
accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime()
+            / EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR);
+  }
+
+  @Test
+  void testMinMemoryFootprintToKillRatioConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getMinMemoryFootprintForKill(),
+        (long) 
(CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO * 
accountant.getWatcherTask()
+            .getQueryMonitorConfig().getMaxHeapSize()));
+    accountant.getWatcherTask().onChange(
+        
Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)),
+        CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getMinMemoryFootprintForKill(),
+        (long) (EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL * 
accountant.getWatcherTask().getQueryMonitorConfig()
+            .getMaxHeapSize()));
+  }
+
+  @Test
+  void testGCBackoffCountConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(),
+        CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)),
+            CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(),
 EXPECTED_GC_BACKOFF_COUNT);
+  }
+
+  @Test
+  void testGCWaitTimeConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(),
+        CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)),
+            CLUSTER_CONFIGS);
+    
assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(),
 EXPECTED_GC_WAIT_TIME);
+  }
+
+  @Test
+  void testQueryKilledMetricEnabledConfigChange() {
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        new 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new 
PinotConfiguration(), "test",
+            InstanceType.SERVER);
+
+    
assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled());
+    accountant.getWatcherTask()
+        
.onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)),
+            CLUSTER_CONFIGS);
+    
assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled());
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 88313fd828..fa8b8fa6bc 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -675,6 +675,11 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     Tracing.ThreadAccountantOps.initializeThreadAccountant(
         _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
         org.apache.pinot.spi.config.instance.InstanceType.SERVER);
+    if (Tracing.getThreadAccountant().getClusterConfigChangeListener() != 
null) {
+      _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+          Tracing.getThreadAccountant().getClusterConfigChangeListener());
+    }
+
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index dd9a8e9c3f..b3a270aea1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.accounting;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 
 
 public interface ThreadResourceUsageAccountant {
@@ -98,6 +99,11 @@ public interface ThreadResourceUsageAccountant {
    */
   void startWatcherTask();
 
+  @Nullable
+  default PinotClusterConfigChangeListener getClusterConfigChangeListener() {
+    return null;
+  }
+
   /**
    * get error status if the query is preempted
    * @return empty string if N/A


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to