This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a52c2a25cc Reduce logs and improve logging when queries are terminated 
due to OOM. (#16172)
a52c2a25cc is described below

commit a52c2a25cc469d88072e38fdd75c7293cda0c9cb
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Wed Jun 25 12:00:42 2025 +0530

    Reduce logs and improve logging when queries are terminated due to OOM. 
(#16172)
---
 .../PerQueryCPUMemAccountantFactory.java           | 39 +++++++++-------------
 1 file changed, 16 insertions(+), 23 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index df110358cb..f733e81ba7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -109,7 +109,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           CPUMemThreadLevelAccountingObjects.ThreadEntry ret =
               new CPUMemThreadLevelAccountingObjects.ThreadEntry();
           _threadEntriesMap.put(Thread.currentThread(), ret);
-          LOGGER.info("Adding thread to _threadLocalEntry: {}", 
Thread.currentThread().getName());
+          LOGGER.debug("Adding thread to _threadLocalEntry: {}", 
Thread.currentThread().getName());
           return ret;
         }
     );
@@ -456,7 +456,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
         if (!thread.isAlive()) {
           _threadEntriesMap.remove(thread);
-          LOGGER.info("Removing thread from _threadLocalEntry: {}", 
thread.getName());
+          LOGGER.debug("Removing thread from _threadLocalEntry: {}", 
thread.getName());
         }
       }
 
@@ -482,6 +482,16 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     public void postAggregation(Map<String, AggregatedStats> 
aggregatedUsagePerActiveQuery) {
     }
 
+    protected void logQueryResourceUsage(Map<String, ? extends 
QueryResourceTracker> aggregatedUsagePerActiveQuery) {
+      LOGGER.warn("Query aggregation results {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
+    }
+
+    protected void logTerminatedQuery(QueryResourceTracker 
queryResourceTracker, long totalHeapMemoryUsage) {
+      LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total 
Heap Usage: {}",
+          queryResourceTracker.getQueryId(), 
queryResourceTracker.getAllocatedBytes(),
+          queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage);
+    }
+
     @Override
     public Exception getErrorStatus() {
       return _threadLocalEntry.get()._errorStatus.getAndSet(null);
@@ -892,9 +902,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
                     " Query %s got killed because using %d bytes of memory on 
%s: %s, exceeding the quota",
                     maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), 
_instanceType, _instanceId)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
-            LOGGER.error("Query {} got picked because using {} bytes of 
memory, actual kill committed true}",
-                maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
-            LOGGER.error("Current task status recorded is {}", 
_threadEntriesMap);
+            logTerminatedQuery(maxUsageTuple, _usedBytes);
           } else if (!_oomKillQueryEnabled) {
             LOGGER.warn("Query {} got picked because using {} bytes of memory, 
actual kill committed false "
                     + "because oomKillQueryEnabled is false",
@@ -902,25 +910,8 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           } else {
             LOGGER.warn("But all queries are below quota, no query killed");
           }
-        } else {
-          maxUsageTuple = 
Collections.max(_aggregatedUsagePerActiveQuery.values(),
-              Comparator.comparing(AggregatedStats::getCpuTimeNs));
-          if (_oomKillQueryEnabled) {
-            maxUsageTuple._exceptionAtomicReference
-                .set(new RuntimeException(String.format(
-                    " Query %s got killed because memory pressure, using %d ns 
of CPU time on %s: %s",
-                    maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), 
_instanceType, _instanceId)));
-            interruptRunnerThread(maxUsageTuple.getAnchorThread());
-            LOGGER.error("Query {} got picked because using {} ns of cpu time, 
actual kill committed true",
-                maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
-            LOGGER.error("Current task status recorded is {}", 
_threadEntriesMap);
-          } else {
-            LOGGER.warn("Query {} got picked because using {} bytes of memory, 
actual kill committed false "
-                    + "because oomKillQueryEnabled is false",
-                maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
-          }
         }
-        LOGGER.warn("Query aggregation results {} for the previous kill.", 
_aggregatedUsagePerActiveQuery.toString());
+        logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
       }
 
       private void killCPUTimeExceedQueries() {
@@ -935,8 +926,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
                         + "CPU time exceeding limit of %d ns CPU time", 
value._queryId, _instanceType, _instanceId,
                     value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS)));
             interruptRunnerThread(value.getAnchorThread());
+            logTerminatedQuery(value, _usedBytes);
           }
         }
+        logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
       }
 
       private void interruptRunnerThread(Thread thread) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to