This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 88de1b7eba Set InstanceType for OOM Protection during startup (#15374) 88de1b7eba is described below commit 88de1b7eba63b15e6ac46d71f3c78dcc396219b2 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Tue Apr 8 05:51:56 2025 +0530 Set InstanceType for OOM Protection during startup (#15374) --- .../pinot/broker/broker/helix/BaseBrokerStarter.java | 3 ++- .../AggregateByQueryIdAccountantFactoryForTest.java | 9 +++++---- .../accounting/HeapUsagePublishingAccountantFactory.java | 3 ++- .../core/accounting/PerQueryCPUMemAccountantFactory.java | 14 +++++++------- .../accounting/PerQueryCPUMemAccountantFactoryForTest.java | 10 ++++++---- .../core/accounting/ResourceManagerAccountingTest.java | 8 +++++--- .../tests/OOMProtectionEnabledIntegrationTest.java | 7 ------- .../OfflineClusterMemBasedBrokerQueryKillingTest.java | 3 --- .../OfflineClusterMemBasedServerQueryKillingTest.java | 3 --- .../tests/OfflineClusterServerCPUTimeQueryKillingTest.java | 3 --- .../integration/tests/WindowResourceAccountingTest.java | 4 ---- .../query/runtime/operator/MultiStageAccountingTest.java | 4 +++- .../query/runtime/queries/QueryRunnerAccountingTest.java | 12 +++++++----- .../pinot/server/starter/helix/BaseServerStarter.java | 3 ++- .../pinot/spi/accounting/ThreadAccountantFactory.java | 3 ++- .../src/main/java/org/apache/pinot/spi/trace/Tracing.java | 6 ++++-- .../java/org/apache/pinot/spi/utils/CommonConstants.java | 4 ---- 17 files changed, 45 insertions(+), 54 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 7fbafc6bea..464e7dc48e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -412,7 +412,8 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT)); Tracing.ThreadAccountantOps.initializeThreadAccountant( - _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); + _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId, + org.apache.pinot.spi.config.instance.InstanceType.BROKER); String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL); if (controllerUrl != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java index 2a74362494..06c96169b8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/AggregateByQueryIdAccountantFactoryForTest.java @@ -25,13 +25,14 @@ import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; public class AggregateByQueryIdAccountantFactoryForTest implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { - return new AggregateByQueryIdAccountant(config, instanceId); + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId, InstanceType instanceType) { + return new AggregateByQueryIdAccountant(config, instanceId, instanceType); } public static class QueryResourceTrackerImpl implements QueryResourceTracker { @@ -75,8 +76,8 @@ public class AggregateByQueryIdAccountantFactoryForTest implements ThreadAccount extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { Map<String, QueryResourceTrackerImpl> _queryMemUsage = new ConcurrentHashMap<>(); - public AggregateByQueryIdAccountant(PinotConfiguration config, String instanceId) { - super(config, instanceId); + public AggregateByQueryIdAccountant(PinotConfiguration config, String instanceId, InstanceType instanceType) { + super(config, instanceId, instanceType); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java index e29d33d044..7e82550286 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java @@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; @@ -38,7 +39,7 @@ import org.apache.pinot.spi.utils.CommonConstants; public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId, InstanceType instanceType) { int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS, CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD); return new HeapUsagePublishingResourceUsageAccountant(period); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 7052b077f6..4f1fa0087f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -63,8 +63,8 @@ import org.slf4j.LoggerFactory; public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { - return new PerQueryCPUMemResourceUsageAccountant(config, instanceId); + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId, InstanceType instanceType) { + return new PerQueryCPUMemResourceUsageAccountant(config, instanceId, instanceType); } public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant { @@ -135,7 +135,10 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory // instance id of the current instance, for logging purpose private final String _instanceId; - public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId) { + private final InstanceType _instanceType; + + public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId, + InstanceType instanceType) { LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant"); _config = config; @@ -155,6 +158,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory LOGGER.info("cpuSamplingConfig: {}, memorySamplingConfig: {}", cpuSamplingConfig, memorySamplingConfig); + _instanceType = instanceType; _isThreadCPUSamplingEnabled = cpuSamplingConfig && threadCpuTimeMeasurementEnabled; _isThreadMemorySamplingEnabled = memorySamplingConfig && threadMemoryMeasurementEnabled; LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled, @@ -644,10 +648,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory _config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED, CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED); - private final InstanceType _instanceType = - InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, - CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString())); - private long _usedBytes; private int _sleepTime; private int _numQueriesKilledConsecutively = 0; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java index 3e6c9004a8..d20f068a36 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.pinot.core.query.utils.QueryIdUtils; import org.apache.pinot.spi.accounting.ThreadAccountantFactory; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; @@ -30,14 +31,15 @@ import org.apache.pinot.spi.env.PinotConfiguration; */ public class PerQueryCPUMemAccountantFactoryForTest implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { - return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId); + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId, InstanceType instanceType) { + return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId, instanceType); } public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { - public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId) { - super(config, instanceId); + public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId, + InstanceType instanceType) { + super(config, instanceId, instanceType); } public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java index d75fa73c6d..0e304b9e03 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java @@ -60,6 +60,7 @@ import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.JsonIndexConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.EarlyTerminationException; @@ -284,7 +285,7 @@ public class ResourceManagerAccountingTest { PinotConfiguration config = getConfig(20, 2, configs); ResourceManager rm = getResourceManager(20, 2, 1, 1, configs); // init accountant and start watcher task - Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect"); + Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testSelect", InstanceType.SERVER); CountDownLatch latch = new CountDownLatch(100); AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false); @@ -353,7 +354,7 @@ public class ResourceManagerAccountingTest { PinotConfiguration config = getConfig(20, 2, configs); ResourceManager rm = getResourceManager(20, 2, 1, 1, configs); // init accountant and start watcher task - Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy"); + Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testGroupBy", InstanceType.SERVER); CountDownLatch latch = new CountDownLatch(100); AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false); @@ -409,7 +410,8 @@ public class ResourceManagerAccountingTest { PinotConfiguration config = getConfig(2, 2, configs); ResourceManager rm = getResourceManager(2, 2, 1, 1, configs); // init accountant and start watcher task - Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testJsonIndexExtractMapOOM"); + Tracing.ThreadAccountantOps.initializeThreadAccountant(config, "testJsonIndexExtractMapOOM", + InstanceType.SERVER); Supplier<String> randomJsonValue = () -> { Random random = new Random(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java index 2bb9efa851..68c3a8a421 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java @@ -20,7 +20,6 @@ package org.apache.pinot.integration.tests; import java.io.File; import java.util.List; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -41,9 +40,6 @@ public class OOMProtectionEnabledIntegrationTest extends BaseClusterIntegrationT configuration.setProperty( CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory"); - configuration.setProperty( - CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, - InstanceType.BROKER.toString()); configuration.setProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true); } @@ -54,9 +50,6 @@ public class OOMProtectionEnabledIntegrationTest extends BaseClusterIntegrationT configuration.setProperty( CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory"); - configuration.setProperty( - CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, - InstanceType.SERVER.toString()); configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java index 2422d778c9..41a5edaeb8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java @@ -39,7 +39,6 @@ import org.apache.log4j.LogManager; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; @@ -155,8 +154,6 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest extends BaseClusterInt + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.40f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, 0.0025f); - brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." - + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, InstanceType.BROKER); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java index 70a87dfcad..29de87eec2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java @@ -38,7 +38,6 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; @@ -179,8 +178,6 @@ public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterInt + CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.0f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.60f); - brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." - + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, InstanceType.BROKER); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java index 9854943198..7f3e6a518c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterServerCPUTimeQueryKillingTest.java @@ -38,7 +38,6 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; @@ -178,8 +177,6 @@ public class OfflineClusterServerCPUTimeQueryKillingTest extends BaseClusterInte + CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.2f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 1.1f); - brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." - + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, InstanceType.BROKER); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f); brokerConf.setProperty(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java index f1ec44f794..d511787f8e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java @@ -28,7 +28,6 @@ import org.apache.pinot.core.accounting.AggregateByQueryIdAccountantFactoryForTe import org.apache.pinot.integration.tests.window.utils.WindowFunnelUtils; import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -66,9 +65,6 @@ public class WindowResourceAccountingTest extends BaseClusterIntegrationTest { } protected void overrideBrokerConf(PinotConfiguration brokerConf) { - brokerConf.setProperty( - CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, - InstanceType.BROKER); brokerConf.setProperty( CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME, AggregateByQueryIdAccountantFactoryForTest.class.getCanonicalName()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java index 820b9b0e6b..0aa9f386c6 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java @@ -42,6 +42,7 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; @@ -90,7 +91,8 @@ public class MultiStageAccountingTest implements ITest { configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false); configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true); // init accountant and start watcher task - Tracing.ThreadAccountantOps.initializeThreadAccountant(new PinotConfiguration(configs), "testGroupBy"); + Tracing.ThreadAccountantOps.initializeThreadAccountant(new PinotConfiguration(configs), "testGroupBy", + InstanceType.SERVER); // Setup Thread Context Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", ThreadExecutionContext.TaskType.MSE); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java index 3c6d809626..28353ead2c 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.Tracing; @@ -112,7 +113,7 @@ public class QueryRunnerAccountingTest extends QueryRunnerTestBase { ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), - "testWithPerQueryAccountantFactory"); + "testWithPerQueryAccountantFactory", InstanceType.SERVER); try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); @@ -134,7 +135,7 @@ public class QueryRunnerAccountingTest extends QueryRunnerTestBase { ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), - "testWithPerQueryAccountantFactory"); + "testWithPerQueryAccountantFactory", InstanceType.SERVER); try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); @@ -150,8 +151,8 @@ public class QueryRunnerAccountingTest extends QueryRunnerTestBase { public static class InterruptingAccountant extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { - public InterruptingAccountant(PinotConfiguration config, String instanceId) { - super(config, instanceId); + public InterruptingAccountant(PinotConfiguration config, String instanceId, InstanceType instanceType) { + super(config, instanceId, instanceType); } @Override @@ -166,7 +167,8 @@ public class QueryRunnerAccountingTest extends QueryRunnerTestBase { ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); InterruptingAccountant accountant = - new InterruptingAccountant(new PinotConfiguration(configs), "testWithPerQueryAccountantFactory"); + new InterruptingAccountant(new PinotConfiguration(configs), "testWithPerQueryAccountantFactory", + InstanceType.SERVER); try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 847b1ae282..c832600861 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -678,7 +678,8 @@ public abstract class BaseServerStarter implements ServiceStartable { instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries); // initialize the thread accountant for query killing Tracing.ThreadAccountantOps.initializeThreadAccountant( - _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); + _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId, + org.apache.pinot.spi.config.instance.InstanceType.SERVER); initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java index f3755c5569..b208fa6a18 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java @@ -18,9 +18,10 @@ */ package org.apache.pinot.spi.accounting; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; public interface ThreadAccountantFactory { - ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId); + ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId, InstanceType instanceType); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 4b5ea93c79..964a64bad1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -32,6 +32,7 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.utils.CommonConstants; @@ -315,7 +316,8 @@ public class Tracing { Tracing.getThreadAccountant().clear(); } - public static void initializeThreadAccountant(PinotConfiguration config, String instanceId) { + public static void initializeThreadAccountant(PinotConfiguration config, String instanceId, + InstanceType instanceType) { String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME); if (factoryName == null) { LOGGER.warn("No thread accountant factory provided, using default implementation"); @@ -324,7 +326,7 @@ public class Tracing { try { ThreadAccountantFactory threadAccountantFactory = (ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance(); - boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId)); + boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId, instanceType)); LOGGER.info("Using accountant provided by {}", factoryName); if (!registered) { LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 5f6b186f70..bcbcc49681 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; -import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.query.QueryThreadContext; @@ -1257,9 +1256,6 @@ public class CommonConstants { public static final String CONFIG_OF_GC_BACKOFF_COUNT = "accounting.gc.backoff.count"; public static final int DEFAULT_GC_BACKOFF_COUNT = 5; - public static final String CONFIG_OF_INSTANCE_TYPE = "accounting.instance.type"; - public static final InstanceType DEFAULT_CONFIG_OF_INSTANCE_TYPE = InstanceType.SERVER; - public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms"; public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org