This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 325054d28d4 Reduce log for PerQueryCPUMemResourceUsageAccountant 
(#16642)
325054d28d4 is described below

commit 325054d28d4ff27ca7273c48972b19bdb7241e0a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 20 12:26:50 2025 -0600

    Reduce log for PerQueryCPUMemResourceUsageAccountant (#16642)
---
 .../PerQueryCPUMemAccountantFactory.java           | 221 +++++++++++----------
 .../pinot/core/accounting/QueryAggregator.java     |   3 +-
 .../core/accounting/TestResourceAccountant.java    |   2 +-
 ...flineClusterMemBasedBrokerQueryKillingTest.java |  21 +-
 ...flineClusterMemBasedServerQueryKillingTest.java |  41 ++--
 ...fflineClusterServerCPUTimeQueryKillingTest.java |  17 +-
 6 files changed, 145 insertions(+), 160 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 79f5d671966..efc8709f941 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.accounting;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.Collection;
@@ -141,22 +142,21 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     protected final InstanceType _instanceType;
 
     protected PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, 
boolean isThreadCPUSamplingEnabled,
-        boolean isThreadMemorySamplingEnabled, boolean 
isThreadSamplingEnabledForMSE, Set<String> inactiveQuery,
-        String instanceId, InstanceType instanceType) {
+        boolean isThreadMemorySamplingEnabled, Set<String> inactiveQuery, 
String instanceId,
+        InstanceType instanceType) {
       _config = config;
       _isThreadCPUSamplingEnabled = isThreadCPUSamplingEnabled;
       _isThreadMemorySamplingEnabled = isThreadMemorySamplingEnabled;
       _inactiveQuery = inactiveQuery;
       _instanceId = instanceId;
       _instanceType = instanceType;
-      _cancelSentQueries = new HashSet<>();
+      _cancelSentQueries = ConcurrentHashMap.newKeySet();
       _watcherTask = createWatcherTask();
       _queryCancelCallbacks = CacheBuilder.newBuilder().build();
     }
 
     public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, 
String instanceId,
         InstanceType instanceType) {
-
       LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
       _config = config;
       _instanceId = instanceId;
@@ -191,7 +191,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
       // task/query tracking
       _inactiveQuery = new HashSet<>();
-      _cancelSentQueries = new HashSet<>();
+      _cancelSentQueries = ConcurrentHashMap.newKeySet();
       _watcherTask = createWatcherTask();
     }
 
@@ -517,9 +517,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     }
 
     protected void logQueryResourceUsage(Map<String, ? extends 
QueryResourceTracker> aggregatedUsagePerActiveQuery) {
-      LOGGER.warn("Query aggregation results {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
+      LOGGER.debug("Query aggregation results: {} for the previous kill.", 
aggregatedUsagePerActiveQuery);
     }
 
+    @VisibleForTesting
     public void cancelQuery(String queryId, Thread anchorThread) {
       MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
       if (callback != null) {
@@ -533,16 +534,15 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
     protected void logTerminatedQuery(QueryResourceTracker 
queryResourceTracker, long totalHeapMemoryUsage,
         boolean hasCallback) {
-      LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total 
Heap Usage: {}. Used Callback: {}",
+      LOGGER.warn("Query: {} terminated. Memory Usage: {}. Cpu Usage: {}. 
Total Heap Usage: {}. Used Callback: {}",
           queryResourceTracker.getQueryId(), 
queryResourceTracker.getAllocatedBytes(),
           queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage, 
hasCallback);
     }
 
     protected void logSelfTerminatedQuery(String queryId, Thread queryThread) {
-      if (!_cancelSentQueries.contains(queryId)) {
-        LOGGER.warn("{} self-terminated. Heap Usage: {}. Query Thread: {}",
-            queryId, _watcherTask.getHeapUsageBytes(), queryThread.getName());
-        _cancelSentQueries.add(queryId);
+      if (_cancelSentQueries.add(queryId)) {
+        LOGGER.warn("Query: {} self-terminated. Total Heap Usage: {}. Query 
Thread: {}", queryId,
+            _watcherTask.getHeapUsageBytes(), queryThread.getName());
       }
     }
 
@@ -555,7 +555,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
      * The triggered level for the actions, only the highest level action will 
get triggered. Severity is defined by
      * the ordinal Normal(0) does not trigger any action.
      */
-    enum TriggeringLevel {
+    protected enum TriggeringLevel {
       Normal, HeapMemoryAlarmingVerbose, CPUTimeBasedKilling, 
HeapMemoryCritical, HeapMemoryPanic
     }
 
@@ -631,14 +631,14 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     /**
      * A watcher task to perform usage sampling, aggregation, and query 
preemption
      */
+    @SuppressWarnings({"rawtypes", "unchecked"})
     public class WatcherTask implements Runnable, 
PinotClusterConfigChangeListener {
-
-      protected AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new 
AtomicReference<>();
+      protected final AtomicReference<QueryMonitorConfig> _queryMonitorConfig 
= new AtomicReference<>();
 
       protected long _usedBytes;
       protected int _sleepTime;
       protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
-      protected TriggeringLevel _triggeringLevel;
+      protected TriggeringLevel _triggeringLevel = TriggeringLevel.Normal;
 
       // metrics class
       private final AbstractMetrics _metrics;
@@ -730,13 +730,18 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
       @Override
       public void run() {
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            runOnce();
-          } finally {
-            // Sleep for sometime
-            reschedule();
+        try {
+          //noinspection InfiniteLoopStatement
+          while (true) {
+            try {
+              runOnce();
+            } finally {
+              //noinspection BusyWait
+              Thread.sleep(_sleepTime);
+            }
           }
+        } catch (InterruptedException e) {
+          LOGGER.warn("WatcherTask interrupted, exiting.");
         }
       }
 
@@ -744,18 +749,19 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         QueryMonitorConfig config = _queryMonitorConfig.get();
 
         LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
-        _triggeringLevel = TriggeringLevel.Normal;
         _sleepTime = config.getNormalSleepTime();
         _aggregatedUsagePerActiveQuery = null;
         try {
           // Get the metrics used for triggering the kill
           collectTriggerMetrics();
+          // Evaluate the triggering levels of query preemption
+          evalTriggers();
           // Prioritize the panic check, kill ALL QUERIES immediately if 
triggered
-          if (outOfMemoryPanicTrigger()) {
+          if (_triggeringLevel == TriggeringLevel.HeapMemoryPanic) {
+            killAllQueries();
+            reapFinishedTasks();
             return;
           }
-          // Check for other triggers
-          evalTriggers();
           // Refresh thread usage and aggregate to per query usage if triggered
           reapFinishedTasks();
           if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) {
@@ -786,46 +792,56 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
         LOGGER.debug("Heap used bytes {}", _usedBytes);
       }
 
-      /**
-       * determine if panic mode need to be triggered, kill all queries if yes
-       * @return if panic mode is triggered
-       */
-      private boolean outOfMemoryPanicTrigger() {
-        long panicLevel = _queryMonitorConfig.get().getPanicLevel();
-        // at this point we assume we have tried to kill some queries and the 
gc kicked in
-        // we have no choice but to kill all queries
-        if (_usedBytes >= panicLevel) {
-          killAllQueries();
-          _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
-          _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1);
-          LOGGER.error("Heap used bytes {}, greater than _panicLevel {}, 
Killed all queries and triggered gc!",
-              _usedBytes, panicLevel);
-          // read finished tasks here as will throw exception and
-          reapFinishedTasks();
-          return true;
-        }
-        return false;
-      }
-
       /**
        * Evaluate triggering levels of query preemption
        * Triggers should be mutually exclusive and evaluated following level 
high -> low
        */
       protected void evalTriggers() {
-        QueryMonitorConfig config = _queryMonitorConfig.get();
-
-        if (config.isCpuTimeBasedKillingEnabled()) {
-          _triggeringLevel = TriggeringLevel.CPUTimeBasedKilling;
-        }
+        TriggeringLevel previousTriggeringLevel = _triggeringLevel;
 
-        if (_usedBytes > config.getCriticalLevel()) {
+        // Compute the new triggering level based on the current heap usage
+        QueryMonitorConfig config = _queryMonitorConfig.get();
+        _triggeringLevel =
+            config.isCpuTimeBasedKillingEnabled() ? 
TriggeringLevel.CPUTimeBasedKilling : TriggeringLevel.Normal;
+        if (_usedBytes > config.getPanicLevel()) {
+          _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
+          _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1);
+        } else if (_usedBytes > config.getCriticalLevel()) {
           _triggeringLevel = TriggeringLevel.HeapMemoryCritical;
           _metrics.addMeteredGlobalValue(_heapMemoryCriticalExceededMeter, 1);
         } else if (_usedBytes > config.getAlarmingLevel()) {
           _sleepTime = config.getAlarmingSleepTime();
           // For debugging
-          _triggeringLevel = (IS_DEBUG_MODE_ENABLED && _triggeringLevel == 
TriggeringLevel.Normal)
-              ? TriggeringLevel.HeapMemoryAlarmingVerbose : _triggeringLevel;
+          if (IS_DEBUG_MODE_ENABLED && _triggeringLevel == 
TriggeringLevel.Normal) {
+            _triggeringLevel = TriggeringLevel.HeapMemoryAlarmingVerbose;
+          }
+        }
+
+        // Log the triggering level change
+        if (previousTriggeringLevel != _triggeringLevel) {
+          switch (_triggeringLevel) {
+            case HeapMemoryPanic:
+              LOGGER.error("Heap used bytes: {} exceeds panic level: {}, 
killing all queries", _usedBytes,
+                  config.getPanicLevel());
+              break;
+            case HeapMemoryCritical:
+              LOGGER.warn("Heap used bytes: {} exceeds critical level: {}, 
killing most expensive query", _usedBytes,
+                  config.getCriticalLevel());
+              if (!_isThreadMemorySamplingEnabled) {
+                LOGGER.error("Unable to terminate queries as memory tracking 
is not enabled");
+              }
+              break;
+            case CPUTimeBasedKilling:
+              if (!_isThreadCPUSamplingEnabled) {
+                LOGGER.error("Unable to terminate queries as CPU time tracking 
is not enabled");
+              }
+              break;
+            case HeapMemoryAlarmingVerbose:
+              LOGGER.debug("Heap used bytes: {} exceeds alarming level: {}", 
_usedBytes, config.getAlarmingLevel());
+              break;
+            default:
+              break;
+          }
         }
       }
 
@@ -835,29 +851,19 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       protected void triggeredActions() {
         switch (_triggeringLevel) {
           case HeapMemoryCritical:
-            LOGGER.warn("Heap used bytes {} exceeds critical level {}", 
_usedBytes,
-                _queryMonitorConfig.get().getCriticalLevel());
             killMostExpensiveQuery();
             break;
           case CPUTimeBasedKilling:
             killCPUTimeExceedQueries();
             break;
           case HeapMemoryAlarmingVerbose:
-            LOGGER.warn("Heap used bytes {} exceeds alarming level", 
_usedBytes);
-            LOGGER.warn("Query usage aggregation results {}", 
_aggregatedUsagePerActiveQuery.toString());
+            LOGGER.debug("Query usage aggregation results: {}", 
_aggregatedUsagePerActiveQuery);
             break;
           default:
             break;
         }
       }
 
-      void reschedule() {
-        try {
-          Thread.sleep(_sleepTime);
-        } catch (InterruptedException ignored) {
-        }
-      }
-
       void killAllQueries() {
         QueryMonitorConfig config = _queryMonitorConfig.get();
 
@@ -882,31 +888,34 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
        */
       private void killMostExpensiveQuery() {
         if (!_isThreadMemorySamplingEnabled) {
-          LOGGER.warn("Unable to terminate queries as  memory tracking is not 
enabled");
           return;
         }
-        QueryMonitorConfig config = _queryMonitorConfig.get();
-        // Critical heap memory usage while no queries running
-        if (_aggregatedUsagePerActiveQuery != null && 
!_aggregatedUsagePerActiveQuery.isEmpty()) {
-          AggregatedStats maxUsageTuple;
-          maxUsageTuple = _aggregatedUsagePerActiveQuery.values().stream()
+        if (!_aggregatedUsagePerActiveQuery.isEmpty()) {
+          AggregatedStats maxUsageTuple = 
_aggregatedUsagePerActiveQuery.values()
+              .stream()
               .filter(stats -> 
!_cancelSentQueries.contains(stats.getQueryId()))
-              
.max(Comparator.comparing(AggregatedStats::getAllocatedBytes)).orElse(null);
+              .max(Comparator.comparing(AggregatedStats::getAllocatedBytes))
+              .orElse(null);
           if (maxUsageTuple != null) {
-            boolean shouldKill =
-                config.isOomKillQueryEnabled() && 
maxUsageTuple._allocatedBytes > config.getMinMemoryFootprintForKill();
-            if (shouldKill) {
-              maxUsageTuple._exceptionAtomicReference.set(new RuntimeException(
-                  String.format(" Query %s got killed because using %d bytes 
of memory on %s: %s, exceeding the quota",
-                      maxUsageTuple._queryId, 
maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
-              boolean hasCallBack = 
_queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
-              terminateQuery(maxUsageTuple);
-              logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
-            } else if (!config.isOomKillQueryEnabled()) {
-              LOGGER.warn("Query {} got picked because using {} bytes of 
memory, actual kill committed false "
-                  + "because oomKillQueryEnabled is false", 
maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+            String queryId = maxUsageTuple.getQueryId();
+            long allocatedBytes = maxUsageTuple.getAllocatedBytes();
+            QueryMonitorConfig config = _queryMonitorConfig.get();
+            if (allocatedBytes > config.getMinMemoryFootprintForKill()) {
+              if (config.isOomKillQueryEnabled()) {
+                maxUsageTuple._exceptionAtomicReference.set(new 
RuntimeException(
+                    String.format("Query: %s got killed on %s: %s because it 
allocated: %d bytes of memory", queryId,
+                        _instanceType, _instanceId, allocatedBytes)));
+                boolean hasCallBack = 
_queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
+                terminateQuery(maxUsageTuple);
+                logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
+              } else {
+                LOGGER.warn("Query: {} got picked because it allocated: {} 
bytes of memory, "
+                    + "not killing it because OOM kill is not enabled", 
queryId, allocatedBytes);
+              }
             } else {
-              LOGGER.warn("But all queries are below quota, no query killed");
+              LOGGER.debug(
+                  "Query: {} has most allocated bytes: {}, but below the 
minimum memory footprint for kill: {}, "
+                      + "skipping query kill", queryId, allocatedBytes, 
config.getMinMemoryFootprintForKill());
             }
           }
           logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
@@ -916,24 +925,34 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       }
 
       private void killCPUTimeExceedQueries() {
-        QueryMonitorConfig config = _queryMonitorConfig.get();
-
-        for (Map.Entry<String, AggregatedStats> entry : 
_aggregatedUsagePerActiveQuery.entrySet()) {
-          AggregatedStats value = entry.getValue();
-          if (value._cpuNS > config.getCpuTimeBasedKillingThresholdNS()) {
-            LOGGER.error("Current task status recorded is {}. Query {} got 
picked because using {} ns of cpu time,"
-                    + " greater than threshold {}", _threadEntriesMap, 
value._queryId, value.getCpuTimeNs(),
-                config.getCpuTimeBasedKillingThresholdNS());
-            value._exceptionAtomicReference.set(new RuntimeException(
-                String.format("Query %s got killed on %s: %s because using %d "
-                        + "CPU time exceeding limit of %d ns CPU time", 
value._queryId, _instanceType, _instanceId,
-                    value.getCpuTimeNs(), 
config.getCpuTimeBasedKillingThresholdNS())));
-            boolean hasCallBack = 
_queryCancelCallbacks.getIfPresent(value.getQueryId()) != null;
-            terminateQuery(value);
-            logTerminatedQuery(value, _usedBytes, hasCallBack);
+        if (!_isThreadCPUSamplingEnabled) {
+          return;
+        }
+        if (!_aggregatedUsagePerActiveQuery.isEmpty()) {
+          QueryMonitorConfig config = _queryMonitorConfig.get();
+          for (Map.Entry<String, AggregatedStats> entry : 
_aggregatedUsagePerActiveQuery.entrySet()) {
+            AggregatedStats stats = entry.getValue();
+            String queryId = stats.getQueryId();
+            if (_cancelSentQueries.contains(queryId)) {
+              continue;
+            }
+            long cpuTimeNs = stats.getCpuTimeNs();
+            if (cpuTimeNs > config.getCpuTimeBasedKillingThresholdNS()) {
+              LOGGER.debug("Current task status recorded is {}. Query {} got 
picked because using {} ns of cpu time,"
+                      + " greater than threshold {}", _threadEntriesMap, 
queryId, cpuTimeNs,
+                  config.getCpuTimeBasedKillingThresholdNS());
+              stats._exceptionAtomicReference.set(new 
RuntimeException(String.format(
+                  "Query: %s got killed on %s: %s because it used: %d ns of 
CPU time (exceeding threshold: %d)",
+                  queryId, _instanceType, _instanceId, cpuTimeNs, 
config.getCpuTimeBasedKillingThresholdNS())));
+              boolean hasCallBack = 
_queryCancelCallbacks.getIfPresent(queryId) != null;
+              terminateQuery(stats);
+              logTerminatedQuery(stats, _usedBytes, hasCallBack);
+            }
           }
+          logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
+        } else {
+          LOGGER.debug("No active queries to kill");
         }
-        logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
       }
 
       private void terminateQuery(AggregatedStats queryResourceTracker) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
index 4c051c1b5b8..c0428205721 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
@@ -49,8 +49,7 @@ import org.slf4j.LoggerFactory;
  * Aggregator that computes resource aggregation for queries. Most of the 
logic from PerQueryCPUMemAccountantFactory is
  * retained here for backward compatibility.
  *
- * Design and algorithm are outlined in
- * 
https://docs.google.com/document/d/1Z9DYAfKznHQI9Wn8BjTWZYTcNRVGiPP0B8aEP3w_1jQ
+ * TODO: Integrate recent changes in PerQueryCPUMemAccountantFactory
  */
 public class QueryAggregator implements ResourceAggregator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryAggregator.class);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
index ba46611343a..5b1ba56c0f8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
@@ -29,7 +29,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 
 class TestResourceAccountant extends 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
   TestResourceAccountant(Map<Thread, 
CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries) {
-    super(new PinotConfiguration(), false, true, true, new HashSet<>(), 
"test", InstanceType.SERVER);
+    super(new PinotConfiguration(), false, true, new HashSet<>(), "test", 
InstanceType.SERVER);
     _threadEntriesMap.putAll(threadEntries);
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
index 63fa21b7a69..11cb1b128e6 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
@@ -34,9 +34,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -95,8 +92,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest 
extends BaseClusterInt
   @BeforeClass
   public void setUp()
       throws Exception {
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.ERROR);
     ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
 
@@ -130,13 +125,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest 
extends BaseClusterInt
 
     //Wait for all documents loaded
     waitForAllDocsLoaded(10_000L);
-
-    // Setup logging and resource accounting
-    
LogManager.getLogger(OfflineClusterMemBasedBrokerQueryKillingTest.class).setLevel(Level.INFO);
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.INFO);
-    
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
-    LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
   }
 
   protected void startBrokers()
@@ -242,11 +230,10 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest 
extends BaseClusterInt
         }
     );
     countDownLatch.await();
-    assertTrue(queryResponse1.get().get("exceptions").toString().contains(
-        "Interrupted in broker reduce phase"));
-    
assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":"
-        + QueryErrorCode.QUERY_CANCELLATION.getId()));
-    assertTrue(queryResponse1.get().get("exceptions").toString().contains("got 
killed because"));
+    String exceptions = queryResponse1.get().get("exceptions").toString();
+    assertTrue(exceptions.contains("Interrupted in broker reduce phase"));
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()));
+    assertTrue(exceptions.contains("got killed on BROKER"));
     
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
     
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 732daed1e50..42db5fd0a11 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -34,8 +34,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -171,8 +169,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   @BeforeClass
   public void setUp()
       throws Exception {
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.ERROR);
     ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
 
@@ -207,13 +203,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
 
     //Wait for all documents loaded
     waitForAllDocsLoaded(10_000L);
-
-    // Setup logging and resource accounting
-    
LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO);
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.INFO);
-    
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
-    LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
   }
 
   protected void startBrokers()
@@ -293,9 +282,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   public void testDigestOOM()
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY);
-    String exceptionsNode = queryResponse.get("exceptions").toString();
-    assertTrue(exceptionsNode.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptionsNode);
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    String exceptions = queryResponse.get("exceptions").toString();
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+    assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
   }
 
   @Test
@@ -327,10 +316,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   public void testSelectionOnlyOOM()
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY_SELECTION_ONLY);
-
-    String exceptionsNode = queryResponse.get("exceptions").toString();
-    assertTrue(exceptionsNode.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptionsNode);
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    String exceptions = queryResponse.get("exceptions").toString();
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+    assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
   }
 
   @Test
@@ -350,8 +338,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   public void testDigestOOM2()
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY_2);
-    String exceptionsNode = queryResponse.get("exceptions").toString();
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    String exceptions = queryResponse.get("exceptions").toString();
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+    assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
   }
 
   @Test
@@ -371,8 +360,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   public void testDigestOOM3()
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY_3);
-    String exceptionsNode = queryResponse.get("exceptions").toString();
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    String exceptions = queryResponse.get("exceptions").toString();
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+    assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
   }
 
   @Test
@@ -392,8 +382,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
   public void testDigestOOM4()
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY_4);
-    String exceptionsNode = queryResponse.get("exceptions").toString();
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    String exceptions = queryResponse.get("exceptions").toString();
+    assertTrue(exceptions.contains("\"errorCode\":" + 
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+    assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
   }
 
   @Test
@@ -447,7 +438,7 @@ public class OfflineClusterMemBasedServerQueryKillingTest 
extends BaseClusterInt
     countDownLatch.await();
     String exceptionsNode = queryResponse1.get().get("exceptions").toString();
     assertTrue(exceptionsNode.contains("\"errorCode\":503"), exceptionsNode);
-    assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+    assertTrue(exceptionsNode.contains("got killed"), exceptionsNode);
     
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()),
 exceptionsNode);
     
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()),
 exceptionsNode);
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
index 8788cdbe068..75f8cac88e8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
@@ -34,9 +34,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -94,8 +91,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest 
extends BaseClusterInte
   @BeforeClass
   public void setUp()
       throws Exception {
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.ERROR);
     ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
 
@@ -130,13 +125,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest 
extends BaseClusterInte
 
     //Wait for all documents loaded
     waitForAllDocsLoaded(10_000L);
-
-    // Setup logging and resource accounting
-    
LogManager.getLogger(OfflineClusterServerCPUTimeQueryKillingTest.class).setLevel(Level.INFO);
-    
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
-        .setLevel(Level.INFO);
-    
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
-    LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
   }
 
   protected void startBrokers()
@@ -254,8 +242,9 @@ public class OfflineClusterServerCPUTimeQueryKillingTest 
extends BaseClusterInte
         }
     );
     countDownLatch.await();
-    assertTrue(queryResponse1.get().get("exceptions").toString().contains("got 
killed on SERVER"));
-    assertTrue(queryResponse1.get().get("exceptions").toString().contains("CPU 
time exceeding limit of"));
+    String exceptions = queryResponse1.get().get("exceptions").toString();
+    assertTrue(exceptions.contains("got killed on SERVER"));
+    assertTrue(exceptions.contains("CPU time"));
     
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
     
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to