This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a52c2a25cc Reduce logs and improve logging when queries are terminated due to OOM. (#16172) a52c2a25cc is described below commit a52c2a25cc469d88072e38fdd75c7293cda0c9cb Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Wed Jun 25 12:00:42 2025 +0530 Reduce logs and improve logging when queries are terminated due to OOM. (#16172) --- .../PerQueryCPUMemAccountantFactory.java | 39 +++++++++------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index df110358cb..f733e81ba7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -109,7 +109,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory CPUMemThreadLevelAccountingObjects.ThreadEntry ret = new CPUMemThreadLevelAccountingObjects.ThreadEntry(); _threadEntriesMap.put(Thread.currentThread(), ret); - LOGGER.info("Adding thread to _threadLocalEntry: {}", Thread.currentThread().getName()); + LOGGER.debug("Adding thread to _threadLocalEntry: {}", Thread.currentThread().getName()); return ret; } ); @@ -456,7 +456,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory if (!thread.isAlive()) { _threadEntriesMap.remove(thread); - LOGGER.info("Removing thread from _threadLocalEntry: {}", thread.getName()); + LOGGER.debug("Removing thread from _threadLocalEntry: {}", thread.getName()); } } @@ -482,6 +482,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) { } + protected void logQueryResourceUsage(Map<String, ? extends QueryResourceTracker> aggregatedUsagePerActiveQuery) { + LOGGER.warn("Query aggregation results {} for the previous kill.", aggregatedUsagePerActiveQuery); + } + + protected void logTerminatedQuery(QueryResourceTracker queryResourceTracker, long totalHeapMemoryUsage) { + LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total Heap Usage: {}", + queryResourceTracker.getQueryId(), queryResourceTracker.getAllocatedBytes(), + queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage); + } + @Override public Exception getErrorStatus() { return _threadLocalEntry.get()._errorStatus.getAndSet(null); @@ -892,9 +902,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory " Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota", maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); interruptRunnerThread(maxUsageTuple.getAnchorThread()); - LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}", - maxUsageTuple._queryId, maxUsageTuple._allocatedBytes); - LOGGER.error("Current task status recorded is {}", _threadEntriesMap); + logTerminatedQuery(maxUsageTuple, _usedBytes); } else if (!_oomKillQueryEnabled) { LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false " + "because oomKillQueryEnabled is false", @@ -902,25 +910,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } else { LOGGER.warn("But all queries are below quota, no query killed"); } - } else { - maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(), - Comparator.comparing(AggregatedStats::getCpuTimeNs)); - if (_oomKillQueryEnabled) { - maxUsageTuple._exceptionAtomicReference - .set(new RuntimeException(String.format( - " Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s", - maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); - interruptRunnerThread(maxUsageTuple.getAnchorThread()); - LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true", - maxUsageTuple._allocatedBytes, maxUsageTuple._queryId); - LOGGER.error("Current task status recorded is {}", _threadEntriesMap); - } else { - LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false " - + "because oomKillQueryEnabled is false", - maxUsageTuple._queryId, maxUsageTuple._allocatedBytes); - } } - LOGGER.warn("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString()); + logQueryResourceUsage(_aggregatedUsagePerActiveQuery); } private void killCPUTimeExceedQueries() { @@ -935,8 +926,10 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory + "CPU time exceeding limit of %d ns CPU time", value._queryId, _instanceType, _instanceId, value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS))); interruptRunnerThread(value.getAnchorThread()); + logTerminatedQuery(value, _usedBytes); } } + logQueryResourceUsage(_aggregatedUsagePerActiveQuery); } private void interruptRunnerThread(Thread thread) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org