This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a09d01faae9 Remove unnecessary methods and config for
ThreadResourceUsageAccountant (#16490)
a09d01faae9 is described below
commit a09d01faae93e5156884d9429decdd43aa8f5582
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Mon Aug 4 15:57:15 2025 +0530
Remove unnecessary methods and config for ThreadResourceUsageAccountant
(#16490)
---
.../PerQueryCPUMemAccountantFactory.java | 20 ----------
.../accounting/ResourceUsageAccountantFactory.java | 16 --------
.../query/runtime/operator/MultiStageOperator.java | 2 +-
.../runtime/operator/MultiStageAccountingTest.java | 2 +-
.../MultistageResourceUsageAccountingTest.java | 2 +-
.../queries/PerQueryCPUMemAccountantTest.java | 43 ----------------------
.../accounting/ThreadResourceUsageAccountant.java | 5 ---
.../java/org/apache/pinot/spi/trace/Tracing.java | 13 -------
.../apache/pinot/spi/utils/CommonConstants.java | 3 --
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 4 --
10 files changed, 3 insertions(+), 107 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index b88f06f2917..293edff6ff6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -127,9 +127,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
// track memory usage
protected final boolean _isThreadMemorySamplingEnabled;
- // is sampling allowed for MSE queries
- protected final boolean _isThreadSamplingEnabledForMSE;
-
protected final Set<String> _inactiveQuery;
protected Set<String> _cancelSentQueries;
@@ -148,7 +145,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
_config = config;
_isThreadCPUSamplingEnabled = isThreadCPUSamplingEnabled;
_isThreadMemorySamplingEnabled = isThreadMemorySamplingEnabled;
- _isThreadSamplingEnabledForMSE = isThreadSamplingEnabledForMSE;
_inactiveQuery = inactiveQuery;
_instanceId = instanceId;
_instanceType = instanceType;
@@ -184,11 +180,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
LOGGER.info("_isThreadCPUSamplingEnabled: {},
_isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled);
- _isThreadSamplingEnabledForMSE =
-
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
- CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
- LOGGER.info("_isThreadSamplingEnabledForMSE: {}",
_isThreadSamplingEnabledForMSE);
-
_queryCancelCallbacks = CacheBuilder.newBuilder().maximumSize(
config.getProperty(CommonConstants.Accounting.CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE,
CommonConstants.Accounting.DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE)).expireAfterWrite(
@@ -274,17 +265,6 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
sampleThreadCPUTime();
}
- /**
- * Sample Usage for Multi-stage engine queries
- */
- @Override
- public void sampleUsageMSE() {
- if (_isThreadSamplingEnabledForMSE) {
- sampleThreadBytesAllocated();
- sampleThreadCPUTime();
- }
- }
-
@Override
public boolean throttleQuerySubmission() {
return getWatcherTask().getHeapUsageBytes() >
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index e00f74d888d..93c68ca7fe3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -82,9 +82,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
// track memory usage
private final boolean _isThreadMemorySamplingEnabled;
- // is sampling allowed for MSE queries
- private final boolean _isThreadSamplingEnabledForMSE;
-
private final WatcherTask _watcherTask;
private final EnumMap<TrackingScope, ResourceAggregator>
_resourceAggregators;
@@ -109,11 +106,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
LOGGER.info("_isThreadCPUSamplingEnabled: {},
_isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
_isThreadMemorySamplingEnabled);
- _isThreadSamplingEnabledForMSE =
-
config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
- CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE);
- LOGGER.info("_isThreadSamplingEnabledForMSE: {}",
_isThreadSamplingEnabledForMSE);
-
_watcherTask = new WatcherTask();
_resourceAggregators = new EnumMap<>(TrackingScope.class);
@@ -138,14 +130,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
sampleThreadCPUTime();
}
- @Override
- public void sampleUsageMSE() {
- if (_isThreadSamplingEnabledForMSE) {
- sampleThreadBytesAllocated();
- sampleThreadCPUTime();
- }
- }
-
@Override
public boolean isAnchorThreadInterrupted() {
ThreadExecutionContext context =
_threadLocalEntry.get().getCurrentThreadTaskStatus();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 3b179ed4b80..adf02c497f4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -97,7 +97,7 @@ public abstract class MultiStageOperator
earlyTerminate();
throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " +
getExplainName());
}
- Tracing.ThreadAccountantOps.sampleMSE();
+ Tracing.ThreadAccountantOps.sample();
if (Tracing.ThreadAccountantOps.isInterrupted()) {
earlyTerminate();
throw
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Resource limit
exceeded for operator: "
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index de34a977d05..57b27b907fd 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -92,7 +92,7 @@ public class MultiStageAccountingTest implements ITest {
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
// init accountant and start watcher task
Tracing.unregisterThreadAccountant();
- Tracing.ThreadAccountantOps.initializeThreadAccountant(new
PinotConfiguration(configs), "testGroupBy",
+ Tracing.ThreadAccountantOps.createThreadAccountant(new
PinotConfiguration(configs), "testGroupBy",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
index 534f1f413d5..8312bd098ba 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
@@ -92,7 +92,7 @@ public class MultistageResourceUsageAccountingTest implements
ITest {
// init accountant and start watcher task
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
Tracing.unregisterThreadAccountant();
- Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg,
"testGroupBy", InstanceType.SERVER);
+ Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
"testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
// Setup Thread Context
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
index 42915480a9a..1aa4b23af29 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
-import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
@@ -98,48 +97,6 @@ public class PerQueryCPUMemAccountantTest extends
QueryRunnerAccountingTest {
}
}
- @Test
- void testDisableSamplingForMSE() {
- HashMap<String, Object> configs = getAccountingConfig();
-
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
false);
-
- ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
- PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
- new
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new
PinotConfiguration(configs),
- "testWithPerQueryAccountantFactory", InstanceType.SERVER);
-
- try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class,
Mockito.CALLS_REAL_METHODS)) {
- tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
- ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2",
false).getResultTable();
- Assert.assertEquals(resultTable.getRows().size(), 2);
-
- Map<String, ? extends QueryResourceTracker> resources =
accountant.getQueryResources();
- Assert.assertEquals(resources.size(), 1);
-
Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(),
0);
- }
- }
-
- @Test
- void testDisableSamplingWithResourceUsageAccountantForMSE() {
- HashMap<String, Object> configs = getAccountingConfig();
-
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE,
false);
-
- ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
- ResourceUsageAccountantFactory.ResourceUsageAccountant accountant =
- new ResourceUsageAccountantFactory.ResourceUsageAccountant(new
PinotConfiguration(configs),
- "testWithPerQueryAccountantFactory", InstanceType.SERVER);
-
- try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class,
Mockito.CALLS_REAL_METHODS)) {
- tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
- ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2",
false).getResultTable();
- Assert.assertEquals(resultTable.getRows().size(), 2);
-
- Map<String, ? extends QueryResourceTracker> resources =
accountant.getQueryResources();
- Assert.assertEquals(resources.size(), 1);
-
Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(),
0);
- }
- }
-
public static class InterruptingAccountant
extends
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index db41723d5be..b744778e8a9 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -75,11 +75,6 @@ public interface ThreadResourceUsageAccountant {
*/
void sampleUsage();
- /**
- * Sample Usage for Multi-stage engine queries
- */
- void sampleUsageMSE();
-
default boolean throttleQuerySubmission() {
return false;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 6d9993cd4b7..d2466edbe47 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -206,10 +206,6 @@ public class Tracing {
public void sampleUsage() {
}
- @Override
- public void sampleUsageMSE() {
- }
-
@Override
public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs,
long allocatedBytes,
TrackingScope trackingScope) {
@@ -295,19 +291,10 @@ public class Tracing {
Tracing.getThreadAccountant().sampleUsage();
}
- public static void sampleMSE() {
- Tracing.getThreadAccountant().sampleUsageMSE();
- }
-
public static void clear() {
Tracing.getThreadAccountant().clear();
}
- public static void initializeThreadAccountant(PinotConfiguration config,
String instanceId,
- InstanceType instanceType) {
- createThreadAccountant(config, instanceId, instanceType);
- }
-
public static ThreadResourceUsageAccountant
createThreadAccountant(PinotConfiguration config, String instanceId,
InstanceType instanceType) {
_workloadBudgetManager = new WorkloadBudgetManager(config);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d8152b1812c..e8c46ff8c8e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1538,9 +1538,6 @@ public class CommonConstants {
public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED =
"accounting.query.killed.metric.enabled";
public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
- public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE =
"accounting.enable.thread.sampling.mse.debug";
- public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true;
-
public static final String CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE =
"accounting.cancel.callback.cache.max.size";
public static final int DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE = 500;
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
index 141b55129f0..5005c543677 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -75,10 +75,6 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
public void sampleUsage() {
}
- @Override
- public void sampleUsageMSE() {
- }
-
@Override
public boolean throttleQuerySubmission() {
return _numCalls.getAndIncrement() > 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]