vrajat commented on code in PR #16445:
URL: https://github.com/apache/pinot/pull/16445#discussion_r2251728822


##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java:
##########
@@ -18,67 +18,124 @@
  */
 package org.apache.pinot.core.accounting;
 
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 class TestResourceAccountant extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
-  TestResourceAccountant(Map<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries) {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestResourceAccountant.class);
+  private long _heapUsageBytes = 0;
+
+  TestResourceAccountant() {
     super(new PinotConfiguration(), false, true, true, new HashSet<>(), 
"test", InstanceType.SERVER);
-    _threadEntriesMap.putAll(threadEntries);
   }
 
-  static void getQueryThreadEntries(String queryId, CountDownLatch threadLatch,
-      Map<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> 
threadEntries) {
-    TaskThread
-        anchorThread = getTaskThread(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, threadLatch, null);
-    threadEntries.put(anchorThread._workerThread, anchorThread._threadEntry);
-    anchorThread._threadEntry._currentThreadMemoryAllocationSampleBytes = 1000;
-
-    CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
-    anchorEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID,
-            ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
-    anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000;
-    threadEntries.put(anchorThread._workerThread, anchorEntry);
-
-    TaskThread taskThread2 = getTaskThread(queryId, 2, threadLatch, 
anchorThread._workerThread);
-    threadEntries.put(taskThread2._workerThread, taskThread2._threadEntry);
-    taskThread2._threadEntry._currentThreadMemoryAllocationSampleBytes = 2000;
-
-    TaskThread taskThread3 = getTaskThread(queryId, 3, threadLatch, 
anchorThread._workerThread);
-    threadEntries.put(taskThread3._workerThread, taskThread3._threadEntry);
-    taskThread3._threadEntry._currentThreadMemoryAllocationSampleBytes = 2500;
+  public void setHeapUsageBytes(long heapUsageBytes) {
+    _heapUsageBytes = heapUsageBytes;
+  }
+
+  @Override
+  public WatcherTask createWatcherTask() {
+    return new TestResourceWatcherTask();
   }
 
-  private static TaskThread getTaskThread(String queryId, int taskId, 
CountDownLatch threadLatch, Thread anchorThread) {
-    CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
-    worker1._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, 
ThreadExecutionContext.TaskType.SSE,
-            anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
-    Thread workerThread1 = new Thread(() -> {
+  class TestResourceWatcherTask extends WatcherTask {
+    TestResourceWatcherTask() {
+      PinotConfiguration config = getPinotConfiguration();
+      QueryMonitorConfig queryMonitorConfig = new QueryMonitorConfig(config, 
1000);
+      _queryMonitorConfig.set(queryMonitorConfig);
+    }
+
+    @Override
+    public void runOnce() {
+      _aggregatedUsagePerActiveQuery = null;
+      _triggeringLevel = TriggeringLevel.Normal;
       try {
-        threadLatch.await();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+        evalTriggers();
+        reapFinishedTasks();
+        _aggregatedUsagePerActiveQuery = getQueryResourcesImpl();
+        _maxHeapUsageQuery.set(_aggregatedUsagePerActiveQuery.values().stream()
+            .filter(stats -> !_cancelSentQueries.contains(stats.getQueryId()))
+            
.max(Comparator.comparing(AggregatedStats::getAllocatedBytes)).orElse(null));
+        triggeredActions();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while executing stats aggregation and 
query kill", e);
+      } finally {
+        // Clean inactive query stats
+        cleanInactive();
       }
-    });
-    workerThread1.start();
-    return new TaskThread(worker1, workerThread1);
+    }
+
+    @Override
+    public long getHeapUsageBytes() {
+      return _heapUsageBytes;
+    }
+  }
+
+  void setCriticalLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+    PinotConfiguration config = getPinotConfiguration();
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
 ratio);
+    _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config, 
maxHeapSize));
+  }
+
+  void setPanicLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+    PinotConfiguration config = getPinotConfiguration();
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
 ratio);
+    _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config, 
maxHeapSize));
+  }
+
+  private static @NotNull PinotConfiguration getPinotConfiguration() {
+    PinotConfiguration config = new PinotConfiguration();
+
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
 0.01);
+
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
+        CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO);

Review Comment:
   Not going to change this because it causes backward compatibility test to 
fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to