This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 325054d28d4 Reduce log for PerQueryCPUMemResourceUsageAccountant
(#16642)
325054d28d4 is described below
commit 325054d28d4ff27ca7273c48972b19bdb7241e0a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 20 12:26:50 2025 -0600
Reduce log for PerQueryCPUMemResourceUsageAccountant (#16642)
---
.../PerQueryCPUMemAccountantFactory.java | 221 +++++++++++----------
.../pinot/core/accounting/QueryAggregator.java | 3 +-
.../core/accounting/TestResourceAccountant.java | 2 +-
...flineClusterMemBasedBrokerQueryKillingTest.java | 21 +-
...flineClusterMemBasedServerQueryKillingTest.java | 41 ++--
...fflineClusterServerCPUTimeQueryKillingTest.java | 17 +-
6 files changed, 145 insertions(+), 160 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 79f5d671966..efc8709f941 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.accounting;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Collection;
@@ -141,22 +142,21 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
protected final InstanceType _instanceType;
protected PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config,
boolean isThreadCPUSamplingEnabled,
- boolean isThreadMemorySamplingEnabled, boolean
isThreadSamplingEnabledForMSE, Set<String> inactiveQuery,
- String instanceId, InstanceType instanceType) {
+ boolean isThreadMemorySamplingEnabled, Set<String> inactiveQuery,
String instanceId,
+ InstanceType instanceType) {
_config = config;
_isThreadCPUSamplingEnabled = isThreadCPUSamplingEnabled;
_isThreadMemorySamplingEnabled = isThreadMemorySamplingEnabled;
_inactiveQuery = inactiveQuery;
_instanceId = instanceId;
_instanceType = instanceType;
- _cancelSentQueries = new HashSet<>();
+ _cancelSentQueries = ConcurrentHashMap.newKeySet();
_watcherTask = createWatcherTask();
_queryCancelCallbacks = CacheBuilder.newBuilder().build();
}
public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config,
String instanceId,
InstanceType instanceType) {
-
LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
_config = config;
_instanceId = instanceId;
@@ -191,7 +191,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
// task/query tracking
_inactiveQuery = new HashSet<>();
- _cancelSentQueries = new HashSet<>();
+ _cancelSentQueries = ConcurrentHashMap.newKeySet();
_watcherTask = createWatcherTask();
}
@@ -517,9 +517,10 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
protected void logQueryResourceUsage(Map<String, ? extends
QueryResourceTracker> aggregatedUsagePerActiveQuery) {
- LOGGER.warn("Query aggregation results {} for the previous kill.",
aggregatedUsagePerActiveQuery);
+ LOGGER.debug("Query aggregation results: {} for the previous kill.",
aggregatedUsagePerActiveQuery);
}
+ @VisibleForTesting
public void cancelQuery(String queryId, Thread anchorThread) {
MseCancelCallback callback = _queryCancelCallbacks.getIfPresent(queryId);
if (callback != null) {
@@ -533,16 +534,15 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
protected void logTerminatedQuery(QueryResourceTracker
queryResourceTracker, long totalHeapMemoryUsage,
boolean hasCallback) {
- LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total
Heap Usage: {}. Used Callback: {}",
+ LOGGER.warn("Query: {} terminated. Memory Usage: {}. Cpu Usage: {}.
Total Heap Usage: {}. Used Callback: {}",
queryResourceTracker.getQueryId(),
queryResourceTracker.getAllocatedBytes(),
queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage,
hasCallback);
}
protected void logSelfTerminatedQuery(String queryId, Thread queryThread) {
- if (!_cancelSentQueries.contains(queryId)) {
- LOGGER.warn("{} self-terminated. Heap Usage: {}. Query Thread: {}",
- queryId, _watcherTask.getHeapUsageBytes(), queryThread.getName());
- _cancelSentQueries.add(queryId);
+ if (_cancelSentQueries.add(queryId)) {
+ LOGGER.warn("Query: {} self-terminated. Total Heap Usage: {}. Query
Thread: {}", queryId,
+ _watcherTask.getHeapUsageBytes(), queryThread.getName());
}
}
@@ -555,7 +555,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
* The triggered level for the actions, only the highest level action will
get triggered. Severity is defined by
* the ordinal Normal(0) does not trigger any action.
*/
- enum TriggeringLevel {
+ protected enum TriggeringLevel {
Normal, HeapMemoryAlarmingVerbose, CPUTimeBasedKilling,
HeapMemoryCritical, HeapMemoryPanic
}
@@ -631,14 +631,14 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
/**
* A watcher task to perform usage sampling, aggregation, and query
preemption
*/
+ @SuppressWarnings({"rawtypes", "unchecked"})
public class WatcherTask implements Runnable,
PinotClusterConfigChangeListener {
-
- protected AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new
AtomicReference<>();
+ protected final AtomicReference<QueryMonitorConfig> _queryMonitorConfig
= new AtomicReference<>();
protected long _usedBytes;
protected int _sleepTime;
protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
- protected TriggeringLevel _triggeringLevel;
+ protected TriggeringLevel _triggeringLevel = TriggeringLevel.Normal;
// metrics class
private final AbstractMetrics _metrics;
@@ -730,13 +730,18 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
@Override
public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- runOnce();
- } finally {
- // Sleep for sometime
- reschedule();
+ try {
+ //noinspection InfiniteLoopStatement
+ while (true) {
+ try {
+ runOnce();
+ } finally {
+ //noinspection BusyWait
+ Thread.sleep(_sleepTime);
+ }
}
+ } catch (InterruptedException e) {
+ LOGGER.warn("WatcherTask interrupted, exiting.");
}
}
@@ -744,18 +749,19 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
QueryMonitorConfig config = _queryMonitorConfig.get();
LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
- _triggeringLevel = TriggeringLevel.Normal;
_sleepTime = config.getNormalSleepTime();
_aggregatedUsagePerActiveQuery = null;
try {
// Get the metrics used for triggering the kill
collectTriggerMetrics();
+ // Evaluate the triggering levels of query preemption
+ evalTriggers();
// Prioritize the panic check, kill ALL QUERIES immediately if
triggered
- if (outOfMemoryPanicTrigger()) {
+ if (_triggeringLevel == TriggeringLevel.HeapMemoryPanic) {
+ killAllQueries();
+ reapFinishedTasks();
return;
}
- // Check for other triggers
- evalTriggers();
// Refresh thread usage and aggregate to per query usage if triggered
reapFinishedTasks();
if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) {
@@ -786,46 +792,56 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
LOGGER.debug("Heap used bytes {}", _usedBytes);
}
- /**
- * determine if panic mode need to be triggered, kill all queries if yes
- * @return if panic mode is triggered
- */
- private boolean outOfMemoryPanicTrigger() {
- long panicLevel = _queryMonitorConfig.get().getPanicLevel();
- // at this point we assume we have tried to kill some queries and the
gc kicked in
- // we have no choice but to kill all queries
- if (_usedBytes >= panicLevel) {
- killAllQueries();
- _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
- _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1);
- LOGGER.error("Heap used bytes {}, greater than _panicLevel {},
Killed all queries and triggered gc!",
- _usedBytes, panicLevel);
- // read finished tasks here as will throw exception and
- reapFinishedTasks();
- return true;
- }
- return false;
- }
-
/**
* Evaluate triggering levels of query preemption
* Triggers should be mutually exclusive and evaluated following level
high -> low
*/
protected void evalTriggers() {
- QueryMonitorConfig config = _queryMonitorConfig.get();
-
- if (config.isCpuTimeBasedKillingEnabled()) {
- _triggeringLevel = TriggeringLevel.CPUTimeBasedKilling;
- }
+ TriggeringLevel previousTriggeringLevel = _triggeringLevel;
- if (_usedBytes > config.getCriticalLevel()) {
+ // Compute the new triggering level based on the current heap usage
+ QueryMonitorConfig config = _queryMonitorConfig.get();
+ _triggeringLevel =
+ config.isCpuTimeBasedKillingEnabled() ?
TriggeringLevel.CPUTimeBasedKilling : TriggeringLevel.Normal;
+ if (_usedBytes > config.getPanicLevel()) {
+ _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
+ _metrics.addMeteredGlobalValue(_heapMemoryPanicExceededMeter, 1);
+ } else if (_usedBytes > config.getCriticalLevel()) {
_triggeringLevel = TriggeringLevel.HeapMemoryCritical;
_metrics.addMeteredGlobalValue(_heapMemoryCriticalExceededMeter, 1);
} else if (_usedBytes > config.getAlarmingLevel()) {
_sleepTime = config.getAlarmingSleepTime();
// For debugging
- _triggeringLevel = (IS_DEBUG_MODE_ENABLED && _triggeringLevel ==
TriggeringLevel.Normal)
- ? TriggeringLevel.HeapMemoryAlarmingVerbose : _triggeringLevel;
+ if (IS_DEBUG_MODE_ENABLED && _triggeringLevel ==
TriggeringLevel.Normal) {
+ _triggeringLevel = TriggeringLevel.HeapMemoryAlarmingVerbose;
+ }
+ }
+
+ // Log the triggering level change
+ if (previousTriggeringLevel != _triggeringLevel) {
+ switch (_triggeringLevel) {
+ case HeapMemoryPanic:
+ LOGGER.error("Heap used bytes: {} exceeds panic level: {},
killing all queries", _usedBytes,
+ config.getPanicLevel());
+ break;
+ case HeapMemoryCritical:
+ LOGGER.warn("Heap used bytes: {} exceeds critical level: {},
killing most expensive query", _usedBytes,
+ config.getCriticalLevel());
+ if (!_isThreadMemorySamplingEnabled) {
+ LOGGER.error("Unable to terminate queries as memory tracking
is not enabled");
+ }
+ break;
+ case CPUTimeBasedKilling:
+ if (!_isThreadCPUSamplingEnabled) {
+ LOGGER.error("Unable to terminate queries as CPU time tracking
is not enabled");
+ }
+ break;
+ case HeapMemoryAlarmingVerbose:
+ LOGGER.debug("Heap used bytes: {} exceeds alarming level: {}",
_usedBytes, config.getAlarmingLevel());
+ break;
+ default:
+ break;
+ }
}
}
@@ -835,29 +851,19 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
protected void triggeredActions() {
switch (_triggeringLevel) {
case HeapMemoryCritical:
- LOGGER.warn("Heap used bytes {} exceeds critical level {}",
_usedBytes,
- _queryMonitorConfig.get().getCriticalLevel());
killMostExpensiveQuery();
break;
case CPUTimeBasedKilling:
killCPUTimeExceedQueries();
break;
case HeapMemoryAlarmingVerbose:
- LOGGER.warn("Heap used bytes {} exceeds alarming level",
_usedBytes);
- LOGGER.warn("Query usage aggregation results {}",
_aggregatedUsagePerActiveQuery.toString());
+ LOGGER.debug("Query usage aggregation results: {}",
_aggregatedUsagePerActiveQuery);
break;
default:
break;
}
}
- void reschedule() {
- try {
- Thread.sleep(_sleepTime);
- } catch (InterruptedException ignored) {
- }
- }
-
void killAllQueries() {
QueryMonitorConfig config = _queryMonitorConfig.get();
@@ -882,31 +888,34 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
*/
private void killMostExpensiveQuery() {
if (!_isThreadMemorySamplingEnabled) {
- LOGGER.warn("Unable to terminate queries as memory tracking is not
enabled");
return;
}
- QueryMonitorConfig config = _queryMonitorConfig.get();
- // Critical heap memory usage while no queries running
- if (_aggregatedUsagePerActiveQuery != null &&
!_aggregatedUsagePerActiveQuery.isEmpty()) {
- AggregatedStats maxUsageTuple;
- maxUsageTuple = _aggregatedUsagePerActiveQuery.values().stream()
+ if (!_aggregatedUsagePerActiveQuery.isEmpty()) {
+ AggregatedStats maxUsageTuple =
_aggregatedUsagePerActiveQuery.values()
+ .stream()
.filter(stats ->
!_cancelSentQueries.contains(stats.getQueryId()))
-
.max(Comparator.comparing(AggregatedStats::getAllocatedBytes)).orElse(null);
+ .max(Comparator.comparing(AggregatedStats::getAllocatedBytes))
+ .orElse(null);
if (maxUsageTuple != null) {
- boolean shouldKill =
- config.isOomKillQueryEnabled() &&
maxUsageTuple._allocatedBytes > config.getMinMemoryFootprintForKill();
- if (shouldKill) {
- maxUsageTuple._exceptionAtomicReference.set(new RuntimeException(
- String.format(" Query %s got killed because using %d bytes
of memory on %s: %s, exceeding the quota",
- maxUsageTuple._queryId,
maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
- boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
- terminateQuery(maxUsageTuple);
- logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
- } else if (!config.isOomKillQueryEnabled()) {
- LOGGER.warn("Query {} got picked because using {} bytes of
memory, actual kill committed false "
- + "because oomKillQueryEnabled is false",
maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+ String queryId = maxUsageTuple.getQueryId();
+ long allocatedBytes = maxUsageTuple.getAllocatedBytes();
+ QueryMonitorConfig config = _queryMonitorConfig.get();
+ if (allocatedBytes > config.getMinMemoryFootprintForKill()) {
+ if (config.isOomKillQueryEnabled()) {
+ maxUsageTuple._exceptionAtomicReference.set(new
RuntimeException(
+ String.format("Query: %s got killed on %s: %s because it
allocated: %d bytes of memory", queryId,
+ _instanceType, _instanceId, allocatedBytes)));
+ boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(maxUsageTuple.getQueryId()) != null;
+ terminateQuery(maxUsageTuple);
+ logTerminatedQuery(maxUsageTuple, _usedBytes, hasCallBack);
+ } else {
+ LOGGER.warn("Query: {} got picked because it allocated: {}
bytes of memory, "
+ + "not killing it because OOM kill is not enabled",
queryId, allocatedBytes);
+ }
} else {
- LOGGER.warn("But all queries are below quota, no query killed");
+ LOGGER.debug(
+ "Query: {} has most allocated bytes: {}, but below the
minimum memory footprint for kill: {}, "
+ + "skipping query kill", queryId, allocatedBytes,
config.getMinMemoryFootprintForKill());
}
}
logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
@@ -916,24 +925,34 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
private void killCPUTimeExceedQueries() {
- QueryMonitorConfig config = _queryMonitorConfig.get();
-
- for (Map.Entry<String, AggregatedStats> entry :
_aggregatedUsagePerActiveQuery.entrySet()) {
- AggregatedStats value = entry.getValue();
- if (value._cpuNS > config.getCpuTimeBasedKillingThresholdNS()) {
- LOGGER.error("Current task status recorded is {}. Query {} got
picked because using {} ns of cpu time,"
- + " greater than threshold {}", _threadEntriesMap,
value._queryId, value.getCpuTimeNs(),
- config.getCpuTimeBasedKillingThresholdNS());
- value._exceptionAtomicReference.set(new RuntimeException(
- String.format("Query %s got killed on %s: %s because using %d "
- + "CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, _instanceId,
- value.getCpuTimeNs(),
config.getCpuTimeBasedKillingThresholdNS())));
- boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(value.getQueryId()) != null;
- terminateQuery(value);
- logTerminatedQuery(value, _usedBytes, hasCallBack);
+ if (!_isThreadCPUSamplingEnabled) {
+ return;
+ }
+ if (!_aggregatedUsagePerActiveQuery.isEmpty()) {
+ QueryMonitorConfig config = _queryMonitorConfig.get();
+ for (Map.Entry<String, AggregatedStats> entry :
_aggregatedUsagePerActiveQuery.entrySet()) {
+ AggregatedStats stats = entry.getValue();
+ String queryId = stats.getQueryId();
+ if (_cancelSentQueries.contains(queryId)) {
+ continue;
+ }
+ long cpuTimeNs = stats.getCpuTimeNs();
+ if (cpuTimeNs > config.getCpuTimeBasedKillingThresholdNS()) {
+ LOGGER.debug("Current task status recorded is {}. Query {} got
picked because using {} ns of cpu time,"
+ + " greater than threshold {}", _threadEntriesMap,
queryId, cpuTimeNs,
+ config.getCpuTimeBasedKillingThresholdNS());
+ stats._exceptionAtomicReference.set(new
RuntimeException(String.format(
+ "Query: %s got killed on %s: %s because it used: %d ns of
CPU time (exceeding threshold: %d)",
+ queryId, _instanceType, _instanceId, cpuTimeNs,
config.getCpuTimeBasedKillingThresholdNS())));
+ boolean hasCallBack =
_queryCancelCallbacks.getIfPresent(queryId) != null;
+ terminateQuery(stats);
+ logTerminatedQuery(stats, _usedBytes, hasCallBack);
+ }
}
+ logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
+ } else {
+ LOGGER.debug("No active queries to kill");
}
- logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
}
private void terminateQuery(AggregatedStats queryResourceTracker) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
index 4c051c1b5b8..c0428205721 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryAggregator.java
@@ -49,8 +49,7 @@ import org.slf4j.LoggerFactory;
* Aggregator that computes resource aggregation for queries. Most of the
logic from PerQueryCPUMemAccountantFactory is
* retained here for backward compatibility.
*
- * Design and algorithm are outlined in
- *
https://docs.google.com/document/d/1Z9DYAfKznHQI9Wn8BjTWZYTcNRVGiPP0B8aEP3w_1jQ
+ * TODO: Integrate recent changes in PerQueryCPUMemAccountantFactory
*/
public class QueryAggregator implements ResourceAggregator {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryAggregator.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
index ba46611343a..5b1ba56c0f8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
@@ -29,7 +29,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
class TestResourceAccountant extends
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
TestResourceAccountant(Map<Thread,
CPUMemThreadLevelAccountingObjects.ThreadEntry> threadEntries) {
- super(new PinotConfiguration(), false, true, true, new HashSet<>(),
"test", InstanceType.SERVER);
+ super(new PinotConfiguration(), false, true, new HashSet<>(), "test",
InstanceType.SERVER);
_threadEntriesMap.putAll(threadEntries);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
index 63fa21b7a69..11cb1b128e6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
@@ -34,9 +34,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -95,8 +92,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest
extends BaseClusterInt
@BeforeClass
public void setUp()
throws Exception {
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.ERROR);
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
@@ -130,13 +125,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest
extends BaseClusterInt
//Wait for all documents loaded
waitForAllDocsLoaded(10_000L);
-
- // Setup logging and resource accounting
-
LogManager.getLogger(OfflineClusterMemBasedBrokerQueryKillingTest.class).setLevel(Level.INFO);
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.INFO);
-
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
- LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
}
protected void startBrokers()
@@ -242,11 +230,10 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest
extends BaseClusterInt
}
);
countDownLatch.await();
- assertTrue(queryResponse1.get().get("exceptions").toString().contains(
- "Interrupted in broker reduce phase"));
-
assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":"
- + QueryErrorCode.QUERY_CANCELLATION.getId()));
- assertTrue(queryResponse1.get().get("exceptions").toString().contains("got
killed because"));
+ String exceptions = queryResponse1.get().get("exceptions").toString();
+ assertTrue(exceptions.contains("Interrupted in broker reduce phase"));
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()));
+ assertTrue(exceptions.contains("got killed on BROKER"));
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 732daed1e50..42db5fd0a11 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -34,8 +34,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -171,8 +169,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
@BeforeClass
public void setUp()
throws Exception {
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.ERROR);
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
@@ -207,13 +203,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
//Wait for all documents loaded
waitForAllDocsLoaded(10_000L);
-
- // Setup logging and resource accounting
-
LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO);
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.INFO);
-
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
- LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
}
protected void startBrokers()
@@ -293,9 +282,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
public void testDigestOOM()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY);
- String exceptionsNode = queryResponse.get("exceptions").toString();
- assertTrue(exceptionsNode.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptionsNode);
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ String exceptions = queryResponse.get("exceptions").toString();
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+ assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
}
@Test
@@ -327,10 +316,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
public void testSelectionOnlyOOM()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY_SELECTION_ONLY);
-
- String exceptionsNode = queryResponse.get("exceptions").toString();
- assertTrue(exceptionsNode.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptionsNode);
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ String exceptions = queryResponse.get("exceptions").toString();
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+ assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
}
@Test
@@ -350,8 +338,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
public void testDigestOOM2()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY_2);
- String exceptionsNode = queryResponse.get("exceptions").toString();
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ String exceptions = queryResponse.get("exceptions").toString();
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+ assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
}
@Test
@@ -371,8 +360,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
public void testDigestOOM3()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY_3);
- String exceptionsNode = queryResponse.get("exceptions").toString();
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ String exceptions = queryResponse.get("exceptions").toString();
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+ assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
}
@Test
@@ -392,8 +382,9 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
public void testDigestOOM4()
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY_4);
- String exceptionsNode = queryResponse.get("exceptions").toString();
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ String exceptions = queryResponse.get("exceptions").toString();
+ assertTrue(exceptions.contains("\"errorCode\":" +
QueryErrorCode.QUERY_CANCELLATION.getId()), exceptions);
+ assertTrue(exceptions.contains("got killed on SERVER"), exceptions);
}
@Test
@@ -447,7 +438,7 @@ public class OfflineClusterMemBasedServerQueryKillingTest
extends BaseClusterInt
countDownLatch.await();
String exceptionsNode = queryResponse1.get().get("exceptions").toString();
assertTrue(exceptionsNode.contains("\"errorCode\":503"), exceptionsNode);
- assertTrue(exceptionsNode.contains("got killed because"), exceptionsNode);
+ assertTrue(exceptionsNode.contains("got killed"), exceptionsNode);
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()),
exceptionsNode);
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()),
exceptionsNode);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
index 8788cdbe068..75f8cac88e8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java
@@ -34,9 +34,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -94,8 +91,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest
extends BaseClusterInte
@BeforeClass
public void setUp()
throws Exception {
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.ERROR);
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
@@ -130,13 +125,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest
extends BaseClusterInte
//Wait for all documents loaded
waitForAllDocsLoaded(10_000L);
-
- // Setup logging and resource accounting
-
LogManager.getLogger(OfflineClusterServerCPUTimeQueryKillingTest.class).setLevel(Level.INFO);
-
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
- .setLevel(Level.INFO);
-
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
- LogManager.getLogger(Tracing.class).setLevel(Level.INFO);
}
protected void startBrokers()
@@ -254,8 +242,9 @@ public class OfflineClusterServerCPUTimeQueryKillingTest
extends BaseClusterInte
}
);
countDownLatch.await();
- assertTrue(queryResponse1.get().get("exceptions").toString().contains("got
killed on SERVER"));
- assertTrue(queryResponse1.get().get("exceptions").toString().contains("CPU
time exceeding limit of"));
+ String exceptions = queryResponse1.get().get("exceptions").toString();
+ assertTrue(exceptions.contains("got killed on SERVER"));
+ assertTrue(exceptions.contains("CPU time"));
assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]