Copilot commented on code in PR #16445:
URL: https://github.com/apache/pinot/pull/16445#discussion_r2239549138


##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -319,8 +321,27 @@ public boolean isAnchorThreadInterrupted() {
     @Override
     public boolean isQueryTerminated() {
       QueryMonitorConfig config = _watcherTask.getQueryMonitorConfig();
-      if (config.isThreadSelfTerminate() && _watcherTask.getHeapUsageBytes() > 
config.getPanicLevel()) {
-        logSelfTerminatedQuery(_threadLocalEntry.get().getQueryId(), 
Thread.currentThread());
+      if (!config.isThreadSelfTerminate()) {
+        // if self-termination is not enabled, no need to check resource usage
+        return false;
+      }
+
+      long heapUsageBytes = _watcherTask.getHeapUsageBytes();
+      String queryId = _threadLocalEntry.get().getQueryId();
+      AggregatedStats maxResourceUsageQuery = _maxHeapUsageQuery.get();
+
+      if (heapUsageBytes > config.getPanicLevel()) {
+        if (_cancelSentQueries.add(queryId)) {
+          logTerminatedQuery(maxResourceUsageQuery, heapUsageBytes, false);
+        }
+        return true;
+      }
+
+      if (heapUsageBytes > config.getCriticalLevel() && maxResourceUsageQuery 
!= null
+          && maxResourceUsageQuery.getQueryId().equals(queryId)) {
+        if (_cancelSentQueries.add(queryId)) {
+          logTerminatedQuery(maxResourceUsageQuery, heapUsageBytes, false);

Review Comment:
   The `logTerminatedQuery` method expects a `QueryResourceTracker` but 
`maxResourceUsageQuery` is of type `AggregatedStats`. This will cause a 
compilation error or incorrect method resolution.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -319,8 +321,27 @@ public boolean isAnchorThreadInterrupted() {
     @Override
     public boolean isQueryTerminated() {
       QueryMonitorConfig config = _watcherTask.getQueryMonitorConfig();
-      if (config.isThreadSelfTerminate() && _watcherTask.getHeapUsageBytes() > 
config.getPanicLevel()) {
-        logSelfTerminatedQuery(_threadLocalEntry.get().getQueryId(), 
Thread.currentThread());
+      if (!config.isThreadSelfTerminate()) {
+        // if self-termination is not enabled, no need to check resource usage
+        return false;
+      }
+
+      long heapUsageBytes = _watcherTask.getHeapUsageBytes();
+      String queryId = _threadLocalEntry.get().getQueryId();
+      AggregatedStats maxResourceUsageQuery = _maxHeapUsageQuery.get();
+
+      if (heapUsageBytes > config.getPanicLevel()) {
+        if (_cancelSentQueries.add(queryId)) {
+          logTerminatedQuery(maxResourceUsageQuery, heapUsageBytes, false);

Review Comment:
   Same issue as line 343: `logTerminatedQuery` method expects a 
`QueryResourceTracker` but `maxResourceUsageQuery` is of type 
`AggregatedStats`. This will cause a compilation error.
   ```suggestion
             QueryResourceTracker queryResourceTracker = 
convertToQueryResourceTracker(maxResourceUsageQuery);
             logTerminatedQuery(queryResourceTracker, heapUsageBytes, false);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -573,32 +594,27 @@ protected void logQueryResourceUsage(Map<String, ? 
extends QueryResourceTracker>
       LOGGER.warn("Query aggregation results {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
     }
 
-    public void cancelQuery(String queryId, Thread anchorThread) {
-      MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
-      if (callback != null) {
-        callback.cancelQuery(Long.parseLong(queryId));
-        _queryCancelCallbacks.invalidate(queryId);
-      } else {
-        anchorThread.interrupt();
+    public void cancelQuery(AggregatedStats queryResourceTracker) {
+      String queryId = queryResourceTracker.getQueryId();
+      if (_cancelSentQueries.add(queryId)) {
+        MseCancelCallback callback = 
_queryCancelCallbacks.getIfPresent(queryId);
+        if (callback != null) {
+          callback.cancelQuery(Long.parseLong(queryId));
+          _queryCancelCallbacks.invalidate(queryId);
+        } else {
+          queryResourceTracker.getAnchorThread().interrupt();
+        }
+        logTerminatedQuery(queryResourceTracker, 
_watcherTask.getHeapUsageBytes(), callback != null);

Review Comment:
   The parameter `queryResourceTracker` is of type `AggregatedStats` but 
`logTerminatedQuery` expects a `QueryResourceTracker`. This type mismatch will 
cause compilation issues.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -813,6 +829,11 @@ public void runOnce() {
           reapFinishedTasks();
           if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) {
             _aggregatedUsagePerActiveQuery = getQueryResourcesImpl();
+            
_maxHeapUsageQuery.set(_aggregatedUsagePerActiveQuery.values().stream()

Review Comment:
   The stream operation with filter and max comparison is executed on every 
watcher task run when triggering level is above Normal. Consider caching this 
computation or only updating when the query set changes to avoid repeated 
expensive operations.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -573,32 +594,27 @@ protected void logQueryResourceUsage(Map<String, ? 
extends QueryResourceTracker>
       LOGGER.warn("Query aggregation results {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
     }
 
-    public void cancelQuery(String queryId, Thread anchorThread) {
-      MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
-      if (callback != null) {
-        callback.cancelQuery(Long.parseLong(queryId));
-        _queryCancelCallbacks.invalidate(queryId);
-      } else {
-        anchorThread.interrupt();
+    public void cancelQuery(AggregatedStats queryResourceTracker) {
+      String queryId = queryResourceTracker.getQueryId();
+      if (_cancelSentQueries.add(queryId)) {
+        MseCancelCallback callback = 
_queryCancelCallbacks.getIfPresent(queryId);
+        if (callback != null) {
+          callback.cancelQuery(Long.parseLong(queryId));
+          _queryCancelCallbacks.invalidate(queryId);
+        } else {
+          queryResourceTracker.getAnchorThread().interrupt();
+        }
+        logTerminatedQuery(queryResourceTracker, 
_watcherTask.getHeapUsageBytes(), callback != null);
       }
-      _cancelSentQueries.add(queryId);
     }
 
     protected void logTerminatedQuery(QueryResourceTracker 
queryResourceTracker, long totalHeapMemoryUsage,
         boolean hasCallback) {
-      LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total 
Heap Usage: {}. Used Callback: {}",
+      LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total 
Heap Usage: {}. Used Callback: {}.",

Review Comment:
   [nitpick] The period at the end of the log message is inconsistent with 
other log messages in the codebase. Consider removing it for consistency.
   ```suggestion
         LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. 
Total Heap Usage: {}. Used Callback: {}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to