This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 29b13f61a4 Dynamic PerQueryCPUMemAccountant Config on Servers (#16219) 29b13f61a4 is described below commit 29b13f61a4aeeaa3d4b1aabc18dc32bf8d7a0836 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Tue Jul 1 17:09:41 2025 +0530 Dynamic PerQueryCPUMemAccountant Config on Servers (#16219) * Checkpoint * Register change handler * Fix bugs. Manually tested * Checkstyle * Tests * Add pre-check that values are default * Undo typo fix --- .../PerQueryCPUMemAccountantFactory.java | 265 +++++++-------- .../pinot/core/accounting/QueryMonitorConfig.java | 369 +++++++++++++++++++++ .../PerQueryCPUMemAccountantFactoryTest.java | 301 +++++++++++++++++ .../server/starter/helix/BaseServerStarter.java | 5 + .../accounting/ThreadResourceUsageAccountant.java | 6 + 5 files changed, 799 insertions(+), 147 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index f733e81ba7..aa387807d5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.pinot.common.metrics.AbstractMetrics; import org.apache.pinot.common.metrics.BrokerGauge; @@ -47,6 +48,7 @@ import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; @@ -87,24 +89,24 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory return thread; }); - private final PinotConfiguration _config; + protected final PinotConfiguration _config; // the map to track stats entry for each thread, the entry will automatically be added when one calls // setThreadResourceUsageProvider on the thread, including but not limited to // server worker thread, runner thread, broker jetty thread, or broker netty thread - private final ConcurrentHashMap<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadEntriesMap + protected final ConcurrentHashMap<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadEntriesMap = new ConcurrentHashMap<>(); // For one time concurrent update of stats. This is to provide stats collection for parts that are not // performance sensitive and query_id is not known beforehand (e.g. broker inbound netty thread) - private final ConcurrentHashMap<String, Long> _concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, Long> _concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<String, Long> _concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<String, Long> _concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>(); // for stats aggregation of finished (worker) threads when the runner is still running - private final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>(); - private final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>(); + protected final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>(); + protected final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>(); - private final ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadLocalEntry + protected final ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadLocalEntry = ThreadLocal.withInitial(() -> { CPUMemThreadLevelAccountingObjects.ThreadEntry ret = new CPUMemThreadLevelAccountingObjects.ThreadEntry(); @@ -115,23 +117,23 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory ); // track thread cpu time - private final boolean _isThreadCPUSamplingEnabled; + protected final boolean _isThreadCPUSamplingEnabled; // track memory usage - private final boolean _isThreadMemorySamplingEnabled; + protected final boolean _isThreadMemorySamplingEnabled; // is sampling allowed for MSE queries - private final boolean _isThreadSamplingEnabledForMSE; + protected final boolean _isThreadSamplingEnabledForMSE; - private final Set<String> _inactiveQuery; + protected final Set<String> _inactiveQuery; // the periodical task that aggregates and preempts queries - private final WatcherTask _watcherTask; + protected final WatcherTask _watcherTask; // instance id of the current instance, for logging purpose - private final String _instanceId; + protected final String _instanceId; - private final InstanceType _instanceType; + protected final InstanceType _instanceType; public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId, InstanceType instanceType) { @@ -263,13 +265,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory @Nullable ThreadExecutionContext parentContext) { } - /** - * for testing only - */ - public int getEntryCount() { - return _threadEntriesMap.size(); - } - @Override @Deprecated public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) { @@ -354,11 +349,20 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory threadEntry.setToIdle(); } + public WatcherTask getWatcherTask() { + return _watcherTask; + } + @Override public void startWatcherTask() { EXECUTOR_SERVICE.submit(_watcherTask); } + @Override + public PinotClusterConfigChangeListener getClusterConfigChangeListener() { + return _watcherTask; + } + /** * remove in active queries from _finishedTaskStatAggregator */ @@ -577,95 +581,13 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory /** * A watcher task to perform usage sampling, aggregation, and query preemption */ - class WatcherTask implements Runnable { - - // max heap usage, Xmx - private final long _maxHeapSize = MEMORY_MX_BEAN.getHeapMemoryUsage().getMax(); - - // don't kill a query if its memory footprint is below some ratio of _maxHeapSize - private final long _minMemoryFootprintForKill = (long) (_maxHeapSize - * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, - CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO)); - - // kill all queries if heap usage exceeds this - private final long _panicLevel = (long) (_maxHeapSize - * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, - CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO)); - - // kill the most expensive query if heap usage exceeds this - private final long _criticalLevel = (long) (_maxHeapSize - * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, - CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO)); - - // if after gc the heap usage is still above this, kill the most expensive query - // use this to prevent heap size oscillation and repeatedly triggering gc - private final long _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize - * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC, - CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)); - - // trigger gc if consecutively kill more than some number of queries - // set this to 0 to always trigger gc before killing a query to give gc a second chance - // as would minimize the chance of false positive killing in some usecases - // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms - private final int _gcBackoffCount = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT, - CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT); - - // start to sample more frequently if heap usage exceeds this - private final long _alarmingLevel = - (long) (_maxHeapSize - * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, - CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO)); - - // normal sleep time - private final int _normalSleepTime = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS, - CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS); - - // wait for gc to complete, according to system.gc() javadoc, when control returns from the method call, - // the Java Virtual Machine has made a best effort to reclaim space from all discarded objects. - // Therefore, we default this to 0. - // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent - private final int _gcWaitTime = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS, - CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS); - - // alarming sleep time denominator, should be > 1 to sample more frequent at alarming level - private final int _alarmingSleepTimeDenominator = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR, - CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR); - - // alarming sleep time - private final int _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator; - - // the framework would not commit to kill any query if this is disabled - private final boolean _oomKillQueryEnabled = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, - CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY); - - // if we want to publish the heap usage - private final boolean _publishHeapUsageMetric = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE, - CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE); - - // if we want kill query based on CPU time - private final boolean _isCPUTimeBasedKillingEnabled = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, - CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED) && _isThreadCPUSamplingEnabled; - - // CPU time based killing threshold - private final long _cpuTimeBasedKillingThresholdNS = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS, - CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L; - - // - private final boolean _isQueryKilledMetricEnabled = - _config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED, - CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED); - - private long _usedBytes; - private int _sleepTime; - private int _numQueriesKilledConsecutively = 0; + public class WatcherTask implements Runnable, PinotClusterConfigChangeListener { + + protected AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new AtomicReference<>(); + + protected long _usedBytes; + protected int _sleepTime; + protected int _numQueriesKilledConsecutively = 0; protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery; private TriggeringLevel _triggeringLevel; @@ -677,6 +599,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory private final AbstractMetrics.Gauge _memoryUsageGauge; WatcherTask() { + _queryMonitorConfig.set(new QueryMonitorConfig(_config, MEMORY_MX_BEAN.getHeapMemoryUsage().getMax())); + logQueryMonitorConfig(); + switch (_instanceType) { case SERVER: _metrics = ServerMetrics.get(); @@ -703,29 +628,64 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } } + public QueryMonitorConfig getQueryMonitorConfig() { + return _queryMonitorConfig.get(); + } + @Override - public void run() { + public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { + // Filter configs that have CommonConstants.PREFIX_SCHEDULER_PREFIX + Set<String> filteredChangedConfigs = + changedConfigs.stream().filter(config -> config.startsWith(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX)) + .map(config -> config.replace(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".", "")) + .collect(Collectors.toSet()); + + if (filteredChangedConfigs.isEmpty()) { + LOGGER.debug("No relevant configs changed, skipping update for QueryMonitorConfig."); + return; + } + + Map<String, String> filteredClusterConfigs = clusterConfigs.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX)).collect( + Collectors.toMap( + entry -> entry.getKey().replace(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + ".", ""), + Map.Entry::getValue)); + + QueryMonitorConfig oldConfig = _queryMonitorConfig.get(); + QueryMonitorConfig newConfig = + new QueryMonitorConfig(oldConfig, filteredChangedConfigs, filteredClusterConfigs); + _queryMonitorConfig.set(newConfig); + logQueryMonitorConfig(); + } + + private void logQueryMonitorConfig() { + QueryMonitorConfig queryMonitorConfig = _queryMonitorConfig.get(); // Log info for the accountant configs - LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant."); - LOGGER.info("Xmx is {}", _maxHeapSize); + LOGGER.info("Updated Configuration for Query Monitor"); + LOGGER.info("Xmx is {}", queryMonitorConfig.getMaxHeapSize()); LOGGER.info("_instanceType is {}", _instanceType); - LOGGER.info("_alarmingLevel of on heap memory is {}", _alarmingLevel); - LOGGER.info("_criticalLevel of on heap memory is {}", _criticalLevel); - LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", _criticalLevelAfterGC); - LOGGER.info("_panicLevel of on heap memory is {}", _panicLevel); - LOGGER.info("_gcBackoffCount is {}", _gcBackoffCount); - LOGGER.info("_gcWaitTime is {}", _gcWaitTime); - LOGGER.info("_normalSleepTime is {}", _normalSleepTime); - LOGGER.info("_alarmingSleepTime is {}", _alarmingSleepTime); - LOGGER.info("_oomKillQueryEnabled: {}", _oomKillQueryEnabled); - LOGGER.info("_minMemoryFootprintForKill: {}", _minMemoryFootprintForKill); + LOGGER.info("_alarmingLevel of on heap memory is {}", queryMonitorConfig.getAlarmingLevel()); + LOGGER.info("_criticalLevel of on heap memory is {}", queryMonitorConfig.getCriticalLevel()); + LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", queryMonitorConfig.getCriticalLevelAfterGC()); + LOGGER.info("_panicLevel of on heap memory is {}", queryMonitorConfig.getPanicLevel()); + LOGGER.info("_gcBackoffCount is {}", queryMonitorConfig.getGcBackoffCount()); + LOGGER.info("_gcWaitTime is {}", queryMonitorConfig.getGcWaitTime()); + LOGGER.info("_normalSleepTime is {}", queryMonitorConfig.getNormalSleepTime()); + LOGGER.info("_alarmingSleepTime is {}", queryMonitorConfig.getAlarmingSleepTime()); + LOGGER.info("_oomKillQueryEnabled: {}", queryMonitorConfig.isOomKillQueryEnabled()); + LOGGER.info("_minMemoryFootprintForKill: {}", queryMonitorConfig.getMinMemoryFootprintForKill()); LOGGER.info("_isCPUTimeBasedKillingEnabled: {}, _cpuTimeBasedKillingThresholdNS: {}", - _isCPUTimeBasedKillingEnabled, _cpuTimeBasedKillingThresholdNS); + queryMonitorConfig.isCpuTimeBasedKillingEnabled(), queryMonitorConfig.getCpuTimeBasedKillingThresholdNS()); + } + @Override + public void run() { while (true) { + QueryMonitorConfig config = _queryMonitorConfig.get(); + LOGGER.debug("Running timed task for PerQueryCPUMemAccountant."); _triggeringLevel = TriggeringLevel.Normal; - _sleepTime = _normalSleepTime; + _sleepTime = config.getNormalSleepTime(); _aggregatedUsagePerActiveQuery = null; try { // Get the metrics used for triggering the kill @@ -750,7 +710,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory LOGGER.debug("_threadEntriesMap size: {}", _threadEntriesMap.size()); // Publish server heap usage metrics - if (_publishHeapUsageMetric) { + if (config.isPublishHeapUsageMetric()) { _metrics.setValueOfGlobalGauge(_memoryUsageGauge, _usedBytes); } // Clean inactive query stats @@ -771,14 +731,15 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory * @return if panic mode is triggered */ private boolean outOfMemoryPanicTrigger() { + long panicLevel = _queryMonitorConfig.get().getPanicLevel(); // at this point we assume we have tried to kill some queries and the gc kicked in // we have no choice but to kill all queries - if (_usedBytes >= _panicLevel) { + if (_usedBytes >= panicLevel) { killAllQueries(); _triggeringLevel = TriggeringLevel.HeapMemoryPanic; _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1); LOGGER.error("Heap used bytes {}, greater than _panicLevel {}, Killed all queries and triggered gc!", - _usedBytes, _panicLevel); + _usedBytes, panicLevel); // call aggregate here as will throw exception and aggregate(false); return true; @@ -791,15 +752,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory * Triggers should be mutually exclusive and evaluated following level high -> low */ private void evalTriggers() { - if (_isCPUTimeBasedKillingEnabled) { + QueryMonitorConfig config = _queryMonitorConfig.get(); + + if (config.isCpuTimeBasedKillingEnabled()) { _triggeringLevel = TriggeringLevel.CPUTimeBasedKilling; } - if (_usedBytes > _criticalLevel) { + if (_usedBytes > config.getCriticalLevel()) { _triggeringLevel = TriggeringLevel.HeapMemoryCritical; _metrics.addMeteredGlobalValue(_heapMemoryCriticalExceededMeter, 1); - } else if (_usedBytes > _alarmingLevel) { - _sleepTime = _alarmingSleepTime; + } else if (_usedBytes > config.getAlarmingLevel()) { + _sleepTime = config.getAlarmingSleepTime(); // For debugging _triggeringLevel = (IS_DEBUG_MODE_ENABLED && _triggeringLevel == TriggeringLevel.Normal) ? TriggeringLevel.HeapMemoryAlarmingVerbose : _triggeringLevel; @@ -812,7 +775,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory private void triggeredActions() { switch (_triggeringLevel) { case HeapMemoryCritical: - LOGGER.warn("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel); + LOGGER.warn("Heap used bytes {} exceeds critical level {}", _usedBytes, + _queryMonitorConfig.get().getCriticalLevel()); killMostExpensiveQuery(); break; case CPUTimeBasedKilling: @@ -837,7 +801,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } void killAllQueries() { - if (_oomKillQueryEnabled) { + QueryMonitorConfig config = _queryMonitorConfig.get(); + + if (config.isOomKillQueryEnabled()) { int killedCount = 0; for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : _threadEntriesMap.entrySet()) { CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = entry.getValue(); @@ -849,11 +815,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory killedCount += 1; } } - if (_isQueryKilledMetricEnabled) { + if (config.isQueryKilledMetricEnabled()) { _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount); } try { - Thread.sleep(_normalSleepTime); + Thread.sleep(config.getNormalSleepTime()); } catch (InterruptedException ignored) { } // In this extreme case we directly trigger system.gc @@ -868,19 +834,21 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory * use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc is triggered */ private void killMostExpensiveQuery() { - if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcBackoffCount) { + QueryMonitorConfig config = _queryMonitorConfig.get(); + if (!_aggregatedUsagePerActiveQuery.isEmpty() + && _numQueriesKilledConsecutively >= config.getGcBackoffCount()) { _numQueriesKilledConsecutively = 0; System.gc(); try { - Thread.sleep(_gcWaitTime); + Thread.sleep(config.getGcWaitTime()); } catch (InterruptedException ignored) { } _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed(); - if (_usedBytes < _criticalLevelAfterGC) { + if (_usedBytes < config.getCriticalLevelAfterGC()) { return; } - LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}", - _usedBytes, _criticalLevelAfterGC); + LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}", _usedBytes, + config.getCriticalLevelAfterGC()); } if (!(_isThreadMemorySamplingEnabled || _isThreadCPUSamplingEnabled)) { LOGGER.warn("But unable to kill query because neither memory nor cpu tracking is enabled"); @@ -895,7 +863,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory if (_isThreadMemorySamplingEnabled) { maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(), Comparator.comparing(AggregatedStats::getAllocatedBytes)); - boolean shouldKill = _oomKillQueryEnabled && maxUsageTuple._allocatedBytes > _minMemoryFootprintForKill; + boolean shouldKill = config.isOomKillQueryEnabled() + && maxUsageTuple._allocatedBytes > config.getMinMemoryFootprintForKill(); if (shouldKill) { maxUsageTuple._exceptionAtomicReference .set(new RuntimeException(String.format( @@ -903,7 +872,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); interruptRunnerThread(maxUsageTuple.getAnchorThread()); logTerminatedQuery(maxUsageTuple, _usedBytes); - } else if (!_oomKillQueryEnabled) { + } else if (!config.isOomKillQueryEnabled()) { LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false " + "because oomKillQueryEnabled is false", maxUsageTuple._queryId, maxUsageTuple._allocatedBytes); @@ -915,16 +884,18 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } private void killCPUTimeExceedQueries() { + QueryMonitorConfig config = _queryMonitorConfig.get(); + for (Map.Entry<String, AggregatedStats> entry : _aggregatedUsagePerActiveQuery.entrySet()) { AggregatedStats value = entry.getValue(); - if (value._cpuNS > _cpuTimeBasedKillingThresholdNS) { + if (value._cpuNS > config.getCpuTimeBasedKillingThresholdNS()) { LOGGER.error("Current task status recorded is {}. Query {} got picked because using {} ns of cpu time," + " greater than threshold {}", _threadEntriesMap, value._queryId, value.getCpuTimeNs(), - _cpuTimeBasedKillingThresholdNS); + config.getCpuTimeBasedKillingThresholdNS()); value._exceptionAtomicReference.set(new RuntimeException( String.format("Query %s got killed on %s: %s because using %d " + "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId, - value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS))); + value.getCpuTimeNs(), config.getCpuTimeBasedKillingThresholdNS()))); interruptRunnerThread(value.getAnchorThread()); logTerminatedQuery(value, _usedBytes); } @@ -934,7 +905,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory private void interruptRunnerThread(Thread thread) { thread.interrupt(); - if (_isQueryKilledMetricEnabled) { + if (_queryMonitorConfig.get().isQueryKilledMetricEnabled()) { _metrics.addMeteredGlobalValue(_queryKilledMeter, 1); } _numQueriesKilledConsecutively += 1; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java new file mode 100644 index 0000000000..ba4b05aa81 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.accounting; + +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; + + +public class QueryMonitorConfig { + private final long _maxHeapSize; + + // don't kill a query if its memory footprint is below some ratio of _maxHeapSize + private final long _minMemoryFootprintForKill; + + // kill all queries if heap usage exceeds this + private final long _panicLevel; + + // kill the most expensive query if heap usage exceeds this + private final long _criticalLevel; + + // if after gc the heap usage is still above this, kill the most expensive query + // use this to prevent heap size oscillation and repeatedly triggering gc + private final long _criticalLevelAfterGC; + + // trigger gc if consecutively kill more than some number of queries + // set this to 0 to always trigger gc before killing a query to give gc a second chance + // as would minimize the chance of false positive killing in some usecases + // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms + private final int _gcBackoffCount; + + // start to sample more frequently if heap usage exceeds this + private final long _alarmingLevel; + + // normal sleep time + private final int _normalSleepTime; + + // wait for gc to complete, according to system.gc() javadoc, when control returns from the method call, + // the Java Virtual Machine has made a best effort to reclaim space from all discarded objects. + // Therefore, we default this to 0. + // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent + private final int _gcWaitTime; + + // alarming sleep time denominator, should be > 1 to sample more frequent at alarming level + private final int _alarmingSleepTimeDenominator; + + // alarming sleep time + private final int _alarmingSleepTime; + + // the framework would not commit to kill any query if this is disabled + private final boolean _oomKillQueryEnabled; + + // if we want to publish the heap usage + private final boolean _publishHeapUsageMetric; + + // if we want kill query based on CPU time + private final boolean _isCPUTimeBasedKillingEnabled; + + // CPU time based killing threshold + private final long _cpuTimeBasedKillingThresholdNS; + + private final boolean _isQueryKilledMetricEnabled; + + public QueryMonitorConfig(PinotConfiguration config, long maxHeapSize) { + _maxHeapSize = maxHeapSize; + + _minMemoryFootprintForKill = (long) (maxHeapSize * config.getProperty( + CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, + CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO)); + + _panicLevel = + (long) (maxHeapSize * config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, + CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO)); + + // kill the most expensive query if heap usage exceeds this + _criticalLevel = + (long) (maxHeapSize * config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, + CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO)); + + _criticalLevelAfterGC = _criticalLevel - (long) (maxHeapSize * config.getProperty( + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC, + CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)); + + _gcBackoffCount = config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT, + CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT); + + _alarmingLevel = + (long) (maxHeapSize * config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, + CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO)); + + _normalSleepTime = config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS, + CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS); + + _gcWaitTime = config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS, + CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS); + + _alarmingSleepTimeDenominator = config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR, + CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR); + + _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator; + + _oomKillQueryEnabled = config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, + CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY); + + _publishHeapUsageMetric = config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE, + CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE); + + _isCPUTimeBasedKillingEnabled = + config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, + CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED); + + _cpuTimeBasedKillingThresholdNS = + config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS, + CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L; + + _isQueryKilledMetricEnabled = config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED, + CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED); + } + + QueryMonitorConfig(QueryMonitorConfig oldConfig, Set<String> changedConfigs, Map<String, String> clusterConfigs) { + _maxHeapSize = oldConfig._maxHeapSize; + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)) { + _minMemoryFootprintForKill = + (long) (_maxHeapSize * CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO); + } else { + _minMemoryFootprintForKill = (long) (_maxHeapSize * Double.parseDouble( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO))); + } + } else { + _minMemoryFootprintForKill = oldConfig._minMemoryFootprintForKill; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)) { + _panicLevel = (long) (_maxHeapSize * CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO); + } else { + _panicLevel = (long) (_maxHeapSize * Double.parseDouble( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO))); + } + } else { + _panicLevel = oldConfig._panicLevel; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)) { + _criticalLevel = (long) (_maxHeapSize * CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO); + } else { + _criticalLevel = (long) (_maxHeapSize * Double.parseDouble( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO))); + } + } else { + _criticalLevel = oldConfig._criticalLevel; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)) { + _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize + * CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC); + } else { + _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize * Double.parseDouble( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC))); + } + } else { + _criticalLevelAfterGC = oldConfig._criticalLevelAfterGC; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)) { + _gcBackoffCount = CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT; + } else { + _gcBackoffCount = Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)); + } + } else { + _gcBackoffCount = oldConfig._gcBackoffCount; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)) { + _alarmingLevel = (long) (_maxHeapSize * CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO); + } else { + _alarmingLevel = (long) (_maxHeapSize * Double.parseDouble( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO))); + } + } else { + _alarmingLevel = oldConfig._alarmingLevel; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)) { + if (clusterConfigs == null || !clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)) { + _normalSleepTime = CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS; + } else { + _normalSleepTime = Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)); + } + } else { + _normalSleepTime = oldConfig._normalSleepTime; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)) { + if (clusterConfigs == null || !clusterConfigs.containsKey(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)) { + _gcWaitTime = CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS; + } else { + _gcWaitTime = Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)); + } + } else { + _gcWaitTime = oldConfig._gcWaitTime; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)) { + _alarmingSleepTimeDenominator = CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR; + } else { + _alarmingSleepTimeDenominator = + Integer.parseInt(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)); + } + } else { + _alarmingSleepTimeDenominator = oldConfig._alarmingSleepTimeDenominator; + } + + _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator; + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)) { + _oomKillQueryEnabled = CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY; + } else { + _oomKillQueryEnabled = + Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)); + } + } else { + _oomKillQueryEnabled = oldConfig._oomKillQueryEnabled; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)) { + _publishHeapUsageMetric = CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE; + } else { + _publishHeapUsageMetric = + Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)); + } + } else { + _publishHeapUsageMetric = oldConfig._publishHeapUsageMetric; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)) { + _isCPUTimeBasedKillingEnabled = CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_ENABLED; + } else { + _isCPUTimeBasedKillingEnabled = Boolean.parseBoolean( + clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)); + } + } else { + _isCPUTimeBasedKillingEnabled = oldConfig._isCPUTimeBasedKillingEnabled; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)) { + _cpuTimeBasedKillingThresholdNS = + CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS * 1000_000L; + } else { + _cpuTimeBasedKillingThresholdNS = + Long.parseLong(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)) + * 1000_000L; + } + } else { + _cpuTimeBasedKillingThresholdNS = oldConfig._cpuTimeBasedKillingThresholdNS; + } + + if (changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)) { + if (clusterConfigs == null || !clusterConfigs.containsKey( + CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)) { + _isQueryKilledMetricEnabled = CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED; + } else { + _isQueryKilledMetricEnabled = + Boolean.parseBoolean(clusterConfigs.get(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)); + } + } else { + _isQueryKilledMetricEnabled = oldConfig._isQueryKilledMetricEnabled; + } + } + + public long getMaxHeapSize() { + return _maxHeapSize; + } + + public long getMinMemoryFootprintForKill() { + return _minMemoryFootprintForKill; + } + + public long getPanicLevel() { + return _panicLevel; + } + + public long getCriticalLevel() { + return _criticalLevel; + } + + public long getCriticalLevelAfterGC() { + return _criticalLevelAfterGC; + } + + public int getGcBackoffCount() { + return _gcBackoffCount; + } + + public long getAlarmingLevel() { + return _alarmingLevel; + } + + public int getNormalSleepTime() { + return _normalSleepTime; + } + + public int getGcWaitTime() { + return _gcWaitTime; + } + + public int getAlarmingSleepTime() { + return _alarmingSleepTime; + } + + public boolean isOomKillQueryEnabled() { + return _oomKillQueryEnabled; + } + + public boolean isPublishHeapUsageMetric() { + return _publishHeapUsageMetric; + } + + public boolean isCpuTimeBasedKillingEnabled() { + return _isCPUTimeBasedKillingEnabled; + } + + public long getCpuTimeBasedKillingThresholdNS() { + return _cpuTimeBasedKillingThresholdNS; + } + + public boolean isQueryKilledMetricEnabled() { + return _isQueryKilledMetricEnabled; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java new file mode 100644 index 0000000000..8dc68670c6 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryTest.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.accounting; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class PerQueryCPUMemAccountantFactoryTest { + private static final double EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL = 0.05; + private static final double EXPECTED_PANIC_LEVEL = 0.9f; + private static final double EXPECTED_CRITICAL_LEVEL = 0.95f; + private static final double EXPECTED_CRITICAL_LEVEL_AFTER_GC = 0.05f; + private static final int EXPECTED_GC_BACKOFF_COUNT = 3; + private static final double EXPECTED_ALARMING_LEVEL = 0.8f; + private static final int EXPECTED_NORMAL_SLEEP_TIME = 50; + private static final int EXPECTED_GC_WAIT_TIME = 1000; + private static final int EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR = 2; + private static final boolean EXPECTED_OOM_KILL_QUERY_ENABLED = true; + private static final boolean EXPECTED_PUBLISH_HEAP_USAGE_METRIC = true; + private static final boolean EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED = true; + private static final long EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS = 1000; + private static final boolean EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED = true; + private static final Map<String, String> CLUSTER_CONFIGS = new HashMap<>(); + + private static String getFullyQualifiedConfigName(String config) { + return CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + config; + } + + @BeforeClass + public void setUp() { + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY), + Boolean.toString(EXPECTED_OOM_KILL_QUERY_ENABLED)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE), + Boolean.toString(EXPECTED_PUBLISH_HEAP_USAGE_METRIC)); + CLUSTER_CONFIGS.put( + getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED), + Boolean.toString(EXPECTED_IS_CPU_TIME_BASED_KILLING_ENABLED)); + CLUSTER_CONFIGS.put( + getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS), + Long.toString(EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO), + Double.toString(EXPECTED_PANIC_LEVEL)); + CLUSTER_CONFIGS.put( + getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO), + Double.toString(EXPECTED_CRITICAL_LEVEL)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName( + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC), + Double.toString(EXPECTED_CRITICAL_LEVEL_AFTER_GC)); + CLUSTER_CONFIGS.put( + getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO), + Double.toString(EXPECTED_ALARMING_LEVEL)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS), + Integer.toString(EXPECTED_NORMAL_SLEEP_TIME)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR), + Integer.toString(EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR)); + CLUSTER_CONFIGS.put( + getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO), + Double.toString(EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT), + Integer.toString(EXPECTED_GC_BACKOFF_COUNT)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS), + Integer.toString(EXPECTED_GC_WAIT_TIME)); + CLUSTER_CONFIGS.put(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED), + Boolean.toString(EXPECTED_IS_QUERY_KILLED_METRIC_ENABLED)); + } + + @Test + void testOOMProtectionKillingQueryConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isOomKillQueryEnabled()); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY)), + CLUSTER_CONFIGS); + assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isOomKillQueryEnabled()); + } + + @Test + void testPublishHeapUsageMetricConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isPublishHeapUsageMetric()); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE)), + CLUSTER_CONFIGS); + assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isPublishHeapUsageMetric()); + } + + @Test + void testCPUTimeBasedKillingEnabledConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isCpuTimeBasedKillingEnabled()); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED)), + CLUSTER_CONFIGS); + assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isCpuTimeBasedKillingEnabled()); + } + + @Test + void testCPUTimeBasedKillingThresholdConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCpuTimeBasedKillingThresholdNS(), + CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS * 1000_000L); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCpuTimeBasedKillingThresholdNS(), + EXPECTED_CPU_TIME_BASED_KILLING_THRESHOLD_NS * 1000_000L); + } + + @Test + void testPanicLevelHeapUsageRatioConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getPanicLevel(), + CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO * accountant.getWatcherTask() + .getQueryMonitorConfig().getMaxHeapSize()); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getPanicLevel(), + EXPECTED_PANIC_LEVEL * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize()); + } + + @Test + void testCriticalLevelHeapUsageRatioConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel(), + CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO * accountant.getWatcherTask() + .getQueryMonitorConfig().getMaxHeapSize()); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel(), + EXPECTED_CRITICAL_LEVEL * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize()); + } + + @Test + void testCriticalLevelHeapUsageRatioDeltaAfterGCConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(), + accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel() + - CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC + * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize()); + accountant.getWatcherTask().onChange(Set.of(getFullyQualifiedConfigName( + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC)), CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevelAfterGC(), + accountant.getWatcherTask().getQueryMonitorConfig().getCriticalLevel() + - EXPECTED_CRITICAL_LEVEL_AFTER_GC * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize()); + } + + @Test + void testAlarmingLevelHeapUsageRatioConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(), + CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO * accountant.getWatcherTask() + .getQueryMonitorConfig().getMaxHeapSize()); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(), + EXPECTED_ALARMING_LEVEL * accountant.getWatcherTask().getQueryMonitorConfig().getMaxHeapSize()); + } + + @Test + void testSleepTimeConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime(), + CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime(), EXPECTED_NORMAL_SLEEP_TIME); + } + + @Test + void testSleepTimeDenominatorConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingSleepTime(), + accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime() + / CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getAlarmingSleepTime(), + accountant.getWatcherTask().getQueryMonitorConfig().getNormalSleepTime() + / EXPECTED_ALARMING_SLEEP_TIME_DENOMINATOR); + } + + @Test + void testMinMemoryFootprintToKillRatioConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getMinMemoryFootprintForKill(), + (long) (CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO * accountant.getWatcherTask() + .getQueryMonitorConfig().getMaxHeapSize())); + accountant.getWatcherTask().onChange( + Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getMinMemoryFootprintForKill(), + (long) (EXPECTED_MIN_MEMORY_FOOTPRINT_FOR_KILL * accountant.getWatcherTask().getQueryMonitorConfig() + .getMaxHeapSize())); + } + + @Test + void testGCBackoffCountConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(), + CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcBackoffCount(), EXPECTED_GC_BACKOFF_COUNT); + } + + @Test + void testGCWaitTimeConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(), + CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS)), + CLUSTER_CONFIGS); + assertEquals(accountant.getWatcherTask().getQueryMonitorConfig().getGcWaitTime(), EXPECTED_GC_WAIT_TIME); + } + + @Test + void testQueryKilledMetricEnabledConfigChange() { + PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = + new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(), "test", + InstanceType.SERVER); + + assertFalse(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled()); + accountant.getWatcherTask() + .onChange(Set.of(getFullyQualifiedConfigName(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED)), + CLUSTER_CONFIGS); + assertTrue(accountant.getWatcherTask().getQueryMonitorConfig().isQueryKilledMetricEnabled()); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 88313fd828..fa8b8fa6bc 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -675,6 +675,11 @@ public abstract class BaseServerStarter implements ServiceStartable { Tracing.ThreadAccountantOps.initializeThreadAccountant( _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId, org.apache.pinot.spi.config.instance.InstanceType.SERVER); + if (Tracing.getThreadAccountant().getClusterConfigChangeListener() != null) { + _clusterConfigChangeHandler.registerClusterConfigChangeListener( + Tracing.getThreadAccountant().getClusterConfigChangeListener()); + } + initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index dd9a8e9c3f..b3a270aea1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -21,6 +21,7 @@ package org.apache.pinot.spi.accounting; import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; +import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; public interface ThreadResourceUsageAccountant { @@ -98,6 +99,11 @@ public interface ThreadResourceUsageAccountant { */ void startWatcherTask(); + @Nullable + default PinotClusterConfigChangeListener getClusterConfigChangeListener() { + return null; + } + /** * get error status if the query is preempted * @return empty string if N/A --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org