gortiz commented on code in PR #16445:
URL: https://github.com/apache/pinot/pull/16445#discussion_r2251022060


##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -134,6 +134,8 @@ public static class PerQueryCPUMemResourceUsageAccountant 
implements ThreadResou
 
     protected Set<String> _cancelSentQueries;
 
+    protected AtomicReference<AggregatedStats> _maxHeapUsageQuery = new 
AtomicReference<>(null);

Review Comment:
   Each time we modify this class, I have to spend a long time remembering 
which thread touches each attribute. Can we add a javadoc on the attributes 
indicating the thread constraint?



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -880,19 +901,37 @@ void reschedule() {
       void killAllQueries() {
         QueryMonitorConfig config = _queryMonitorConfig.get();
 
-        if (config.isOomKillQueryEnabled()) {
+        if (config.isOomKillQueryEnabled() && !config.isThreadSelfTerminate()) 
{
           int killedCount = 0;
           for (Map.Entry<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : 
_threadEntriesMap.entrySet()) {
             CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
entry.getValue();
             CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = 
threadEntry.getCurrentThreadTaskStatus();
-            if (taskEntry != null && 
!_cancelSentQueries.contains(taskEntry.getQueryId())) {
-              cancelQuery(taskEntry.getQueryId(), taskEntry.getAnchorThread());
+            if (taskEntry != null && 
_cancelSentQueries.add(taskEntry.getQueryId())) {
+              String queryId = taskEntry.getQueryId();
+              // The cache will be invalidated after the termination is logged.
+              MseCancelCallback callback = 
_queryCancelCallbacks.getIfPresent(queryId);
+              if (callback != null) {
+                callback.cancelQuery(Long.parseLong(queryId));
+              } else {
+                taskEntry.getAnchorThread().interrupt();
+              }

Review Comment:
   This is almost the same code that we have in `cancelQuery`. I think it would 
be better to extract the code into an internal cancel query.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -539,15 +560,18 @@ protected void logQueryResourceUsage(Map<String, ? 
extends QueryResourceTracker>
       LOGGER.warn("Query aggregation results {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
     }
 
-    public void cancelQuery(String queryId, Thread anchorThread) {
-      MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
-      if (callback != null) {
-        callback.cancelQuery(Long.parseLong(queryId));
-        _queryCancelCallbacks.invalidate(queryId);
-      } else {
-        anchorThread.interrupt();
+    public void cancelQuery(QueryResourceTracker queryResourceTracker, Thread 
anchorThread) {
+      String queryId = queryResourceTracker.getQueryId();
+      if (_cancelSentQueries.add(queryId)) {
+        MseCancelCallback callback = 
_queryCancelCallbacks.getIfPresent(queryId);
+        if (callback != null) {
+          callback.cancelQuery(Long.parseLong(queryId));
+          _queryCancelCallbacks.invalidate(queryId);
+        } else {
+          anchorThread.interrupt();
+        }

Review Comment:
   I think we already discuss about that. IIRC the anchor thread interruption 
is what is used to kill SSE queries. I think it would be better (easier to read 
and maintain) if we move that anchor thread interruption into a callback we 
register for SSE queries in the same way we have a custom callback for MSE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to