vrajat commented on code in PR #16445:
URL: https://github.com/apache/pinot/pull/16445#discussion_r2251728822
##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java:
##########
@@ -18,67 +18,124 @@
*/
package org.apache.pinot.core.accounting;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
-import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class TestResourceAccountant extends
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
- TestResourceAccountant(Map<Thread,
CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries) {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestResourceAccountant.class);
+ private long _heapUsageBytes = 0;
+
+ TestResourceAccountant() {
super(new PinotConfiguration(), false, true, true, new HashSet<>(),
"test", InstanceType.SERVER);
- _threadEntriesMap.putAll(threadEntries);
}
- static void getQueryThreadEntries(String queryId, CountDownLatch threadLatch,
- Map<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry>
threadEntries) {
- TaskThread
- anchorThread = getTaskThread(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID, threadLatch, null);
- threadEntries.put(anchorThread._workerThread, anchorThread._threadEntry);
- anchorThread._threadEntry._currentThreadMemoryAllocationSampleBytes = 1000;
-
- CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new
CPUMemThreadLevelAccountingObjects.ThreadEntry();
- anchorEntry._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId,
CommonConstants.Accounting.ANCHOR_TASK_ID,
- ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread,
- CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
- anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000;
- threadEntries.put(anchorThread._workerThread, anchorEntry);
-
- TaskThread taskThread2 = getTaskThread(queryId, 2, threadLatch,
anchorThread._workerThread);
- threadEntries.put(taskThread2._workerThread, taskThread2._threadEntry);
- taskThread2._threadEntry._currentThreadMemoryAllocationSampleBytes = 2000;
-
- TaskThread taskThread3 = getTaskThread(queryId, 3, threadLatch,
anchorThread._workerThread);
- threadEntries.put(taskThread3._workerThread, taskThread3._threadEntry);
- taskThread3._threadEntry._currentThreadMemoryAllocationSampleBytes = 2500;
+ public void setHeapUsageBytes(long heapUsageBytes) {
+ _heapUsageBytes = heapUsageBytes;
+ }
+
+ @Override
+ public WatcherTask createWatcherTask() {
+ return new TestResourceWatcherTask();
}
- private static TaskThread getTaskThread(String queryId, int taskId,
CountDownLatch threadLatch, Thread anchorThread) {
- CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new
CPUMemThreadLevelAccountingObjects.ThreadEntry();
- worker1._currentThreadTaskStatus.set(
- new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId,
ThreadExecutionContext.TaskType.SSE,
- anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
- Thread workerThread1 = new Thread(() -> {
+ class TestResourceWatcherTask extends WatcherTask {
+ TestResourceWatcherTask() {
+ PinotConfiguration config = getPinotConfiguration();
+ QueryMonitorConfig queryMonitorConfig = new QueryMonitorConfig(config,
1000);
+ _queryMonitorConfig.set(queryMonitorConfig);
+ }
+
+ @Override
+ public void runOnce() {
+ _aggregatedUsagePerActiveQuery = null;
+ _triggeringLevel = TriggeringLevel.Normal;
try {
- threadLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ evalTriggers();
+ reapFinishedTasks();
+ _aggregatedUsagePerActiveQuery = getQueryResourcesImpl();
+ _maxHeapUsageQuery.set(_aggregatedUsagePerActiveQuery.values().stream()
+ .filter(stats -> !_cancelSentQueries.contains(stats.getQueryId()))
+
.max(Comparator.comparing(AggregatedStats::getAllocatedBytes)).orElse(null));
+ triggeredActions();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while executing stats aggregation and
query kill", e);
+ } finally {
+ // Clean inactive query stats
+ cleanInactive();
}
- });
- workerThread1.start();
- return new TaskThread(worker1, workerThread1);
+ }
+
+ @Override
+ public long getHeapUsageBytes() {
+ return _heapUsageBytes;
+ }
+ }
+
+ void setCriticalLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+ PinotConfiguration config = getPinotConfiguration();
+
config.setProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
ratio);
+ _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config,
maxHeapSize));
+ }
+
+ void setPanicLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+ PinotConfiguration config = getPinotConfiguration();
+
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
ratio);
+ _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config,
maxHeapSize));
+ }
+
+ private static @NotNull PinotConfiguration getPinotConfiguration() {
+ PinotConfiguration config = new PinotConfiguration();
+
+
config.setProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
0.01);
+
+
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
+ CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO);
Review Comment:
Not going to change this because it causes backward compatibility test to
fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]