Copilot commented on code in PR #16445:
URL: https://github.com/apache/pinot/pull/16445#discussion_r2242533332


##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java:
##########
@@ -54,18 +66,21 @@ void testQueryAggregation() {
    */
   @Test
   void testQueryAggregationCreateNewTask() {
-    Map<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries 
= new HashMap<>();
     CountDownLatch threadLatch = new CountDownLatch(1);
     String queryId = "testQueryAggregationCreateNewTask";
-    TestResourceAccountant.getQueryThreadEntries(queryId, threadLatch, 
threadEntries);
-    TestResourceAccountant accountant = new 
TestResourceAccountant(threadEntries);
+    TestResourceAccountant accountant = new TestResourceAccountant();
+    Tracing.register(accountant);
+    startQueryThreads(queryId, threadLatch, new AtomicInteger(0), 
List.of(1000, 2000, 2500));
+
+    // Ensure the Accountant state is correctly initialized
+    waitForQueryResourceTracker(accountant, queryId, 5500);
 
     TestResourceAccountant.TaskThread anchorThread =
         accountant.getTaskThread(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID);
     assertNotNull(anchorThread);
 
-    // Replace task id = 3 (2500 bytes) with a new task id 5 (1500 bytes)
-    TestResourceAccountant.TaskThread workerEntry = 
accountant.getTaskThread(queryId, 3);
+    // Replace task id = 2 (2500 bytes) with a new task id 5 (1500 bytes)

Review Comment:
   The comment incorrectly states 'task id = 2 (2500 bytes)' but the original 
test setup shows task id 2 has 2000 bytes and task id 2 (the third task with 
index 2) has 2500 bytes. The comment should clarify which task is being 
referenced.
   ```suggestion
       // Replace task id = 2 (2000 bytes) with a new task id 5 (1500 bytes)
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java:
##########
@@ -18,67 +18,124 @@
  */
 package org.apache.pinot.core.accounting;
 
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
-import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 class TestResourceAccountant extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
-  TestResourceAccountant(Map<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries) {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestResourceAccountant.class);
+  private long _heapUsageBytes = 0;
+
+  TestResourceAccountant() {
     super(new PinotConfiguration(), false, true, true, new HashSet<>(), 
"test", InstanceType.SERVER);
-    _threadEntriesMap.putAll(threadEntries);
   }
 
-  static void getQueryThreadEntries(String queryId, CountDownLatch threadLatch,
-      Map<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> 
threadEntries) {
-    TaskThread
-        anchorThread = getTaskThread(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, threadLatch, null);
-    threadEntries.put(anchorThread._workerThread, anchorThread._threadEntry);
-    anchorThread._threadEntry._currentThreadMemoryAllocationSampleBytes = 1000;
-
-    CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
-    anchorEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID,
-            ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
-    anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000;
-    threadEntries.put(anchorThread._workerThread, anchorEntry);
-
-    TaskThread taskThread2 = getTaskThread(queryId, 2, threadLatch, 
anchorThread._workerThread);
-    threadEntries.put(taskThread2._workerThread, taskThread2._threadEntry);
-    taskThread2._threadEntry._currentThreadMemoryAllocationSampleBytes = 2000;
-
-    TaskThread taskThread3 = getTaskThread(queryId, 3, threadLatch, 
anchorThread._workerThread);
-    threadEntries.put(taskThread3._workerThread, taskThread3._threadEntry);
-    taskThread3._threadEntry._currentThreadMemoryAllocationSampleBytes = 2500;
+  public void setHeapUsageBytes(long heapUsageBytes) {
+    _heapUsageBytes = heapUsageBytes;
+  }
+
+  @Override
+  public WatcherTask createWatcherTask() {
+    return new TestResourceWatcherTask();
   }
 
-  private static TaskThread getTaskThread(String queryId, int taskId, 
CountDownLatch threadLatch, Thread anchorThread) {
-    CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
-    worker1._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, 
ThreadExecutionContext.TaskType.SSE,
-            anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
-    Thread workerThread1 = new Thread(() -> {
+  class TestResourceWatcherTask extends WatcherTask {
+    TestResourceWatcherTask() {
+      PinotConfiguration config = getPinotConfiguration();
+      QueryMonitorConfig queryMonitorConfig = new QueryMonitorConfig(config, 
1000);
+      _queryMonitorConfig.set(queryMonitorConfig);
+    }
+
+    @Override
+    public void runOnce() {
+      _aggregatedUsagePerActiveQuery = null;
+      _triggeringLevel = TriggeringLevel.Normal;
       try {
-        threadLatch.await();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+        evalTriggers();
+        reapFinishedTasks();
+        _aggregatedUsagePerActiveQuery = getQueryResourcesImpl();
+        _maxHeapUsageQuery.set(_aggregatedUsagePerActiveQuery.values().stream()
+            .filter(stats -> !_cancelSentQueries.contains(stats.getQueryId()))
+            
.max(Comparator.comparing(AggregatedStats::getAllocatedBytes)).orElse(null));
+        triggeredActions();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while executing stats aggregation and 
query kill", e);
+      } finally {
+        // Clean inactive query stats
+        cleanInactive();
       }
-    });
-    workerThread1.start();
-    return new TaskThread(worker1, workerThread1);
+    }
+
+    @Override
+    public long getHeapUsageBytes() {
+      return _heapUsageBytes;
+    }
+  }
+
+  void setCriticalLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+    PinotConfiguration config = getPinotConfiguration();
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
 ratio);
+    _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config, 
maxHeapSize));
+  }
+
+  void setPanicLevelHeapUsageRatio(long maxHeapSize, double ratio) {
+    PinotConfiguration config = getPinotConfiguration();
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
 ratio);
+    _watcherTask._queryMonitorConfig.set(new QueryMonitorConfig(config, 
maxHeapSize));
+  }
+
+  private static @NotNull PinotConfiguration getPinotConfiguration() {
+    PinotConfiguration config = new PinotConfiguration();
+
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
 0.01);
+
+    
config.setProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
+        CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO);

Review Comment:
   There is a typo in the constant name 'DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO'. 
It should be 'DEFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO'.
   ```suggestion
           CommonConstants.Accounting.DEFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -914,19 +935,37 @@ void reschedule() {
       void killAllQueries() {
         QueryMonitorConfig config = _queryMonitorConfig.get();
 
-        if (config.isOomKillQueryEnabled()) {
+        if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) 
{
           int killedCount = 0;
           for (Map.Entry<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : 
_threadEntriesMap.entrySet()) {

Review Comment:
   Iterating over `_threadEntriesMap` without synchronization while other 
threads may be modifying it could lead to ConcurrentModificationException. 
Consider using appropriate synchronization or a thread-safe iteration approach.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -319,8 +321,27 @@ public boolean isAnchorThreadInterrupted() {
     @Override
     public boolean isQueryTerminated() {
       QueryMonitorConfig config = _watcherTask.getQueryMonitorConfig();
-      if (config.isThreadSelfTerminate() && _watcherTask.getHeapUsageBytes() > 
config.getPanicLevel()) {
-        logSelfTerminatedQuery(_threadLocalEntry.get().getQueryId(), 
Thread.currentThread());
+      // if self-termination is not enabled, no need to check resource usage
+      if (!config.isThreadSelfTerminate()) {
+        return false;
+      }
+
+      long heapUsageBytes = _watcherTask.getHeapUsageBytes();
+      String queryId = _threadLocalEntry.get().getQueryId();
+      AggregatedStats maxResourceUsageQuery = _maxHeapUsageQuery.get();
+
+      if (heapUsageBytes > config.getPanicLevel()) {
+        if (_cancelSentQueries.add(queryId)) {

Review Comment:
   The `_cancelSentQueries.add()` check-and-act pattern is not atomic. Multiple 
threads could pass the condition simultaneously before any can add the queryId, 
potentially leading to duplicate termination logging or processing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to