This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3a8c578020 Return 503 for all interrupted queries. Refactor the query killing code. (#10683) 3a8c578020 is described below commit 3a8c57802059c26a4b8733414407be769f732816 Author: Jia Guo <jia...@linkedin.com> AuthorDate: Wed May 10 15:53:31 2023 -0700 Return 503 for all interrupted queries. Refactor the query killing code. (#10683) * Change the query cancellation error code to 503 Refine the return error code of query killing * Trigger Test * Trigger Test --- .../broker/broker/helix/BaseBrokerStarter.java | 2 +- .../pinot/common/exception/QueryException.java | 2 +- .../HeapUsagePublishingAccountantFactory.java | 2 +- .../PerQueryCPUMemAccountantFactory.java | 36 +++++++++++++++------- .../PerQueryCPUMemAccountantFactoryForTest.java | 8 ++--- .../blocks/results/ExceptionResultsBlock.java | 5 +++ .../query/scheduler/resources/ResourceManager.java | 3 -- ...flineClusterMemBasedBrokerQueryKillingTest.java | 3 ++ ...lineClusterMemBasedServerQueryKillingTest.java} | 10 ++++-- .../server/starter/helix/BaseServerStarter.java | 4 +++ .../spi/accounting/ThreadAccountantFactory.java | 2 +- .../java/org/apache/pinot/spi/trace/Tracing.java | 4 +-- .../apache/pinot/spi/utils/CommonConstants.java | 3 ++ 13 files changed, 57 insertions(+), 27 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 7c68519b3d..e8329ec06f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -321,7 +321,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT)); Tracing.ThreadAccountantOps - .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX)); + .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL); if (controllerUrl != null) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index bb8684605e..a6a1a2bfa4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -54,7 +54,7 @@ public class QueryException { public static final int ACCESS_DENIED_ERROR_CODE = 180; public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190; public static final int QUERY_EXECUTION_ERROR_CODE = 200; - public static final int QUERY_CANCELLATION_ERROR_CODE = 205; + public static final int QUERY_CANCELLATION_ERROR_CODE = 503; // TODO: Handle these errors in broker public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210; public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java index c310494905..e29d33d044 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java @@ -38,7 +38,7 @@ import org.apache.pinot.spi.utils.CommonConstants; public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config) { + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS, CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD); return new HeapUsagePublishingResourceUsageAccountant(period); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 8e81b2581d..5b724f4a0f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory; public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config) { - return new PerQueryCPUMemResourceUsageAccountant(config); + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { + return new PerQueryCPUMemResourceUsageAccountant(config, instanceId); } public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant { @@ -125,10 +125,14 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory // the periodical task that aggregates and preempts queries private final WatcherTask _watcherTask; - public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config) { + // instance id of the current instance, for logging purpose + private final String _instanceId; + + public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId) { LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant"); _config = config; + _instanceId = instanceId; boolean threadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled(); boolean threadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled(); @@ -540,6 +544,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS, CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L; + // + private final boolean _isQueryKilledMetricEnabled = + _config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED, + CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED); + private final InstanceType _instanceType = InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString())); @@ -730,7 +739,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory killedCount += 1; } } - _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount); + if (_isQueryKilledMetricEnabled) { + _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount); + } try { Thread.sleep(_normalSleepTime); } catch (InterruptedException ignored) { @@ -778,8 +789,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory if (shouldKill) { maxUsageTuple._exceptionAtomicReference .set(new RuntimeException(String.format( - " Query %s got killed because using %d bytes of memory on %s, exceeding the quota", - maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType))); + " Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota", + maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); interruptRunnerThread(maxUsageTuple.getAnchorThread()); LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}", maxUsageTuple._queryId, maxUsageTuple._allocatedBytes); @@ -797,8 +808,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory if (_oomKillQueryEnabled) { maxUsageTuple._exceptionAtomicReference .set(new RuntimeException(String.format( - " Query %s got killed because memory pressure, using %d ns of CPU time on %s", - maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType))); + " Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s", + maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId))); interruptRunnerThread(maxUsageTuple.getAnchorThread()); LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true", maxUsageTuple._allocatedBytes, maxUsageTuple._queryId); @@ -819,8 +830,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory LOGGER.error("Query {} got picked because using {} ns of cpu time, greater than threshold {}", value._queryId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS); value._exceptionAtomicReference.set(new RuntimeException( - String.format("Query %s got killed on %s because using %d CPU time exceeding limit of %d ns CPU time", - value._queryId, _instanceType, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS))); + String.format("Query %s got killed on %s: %s because using %d " + + "CPU time exceeding limit of %d ns CPU time", + value._queryId, _instanceType, _instanceId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS))); interruptRunnerThread(value.getAnchorThread()); } } @@ -829,7 +841,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory private void interruptRunnerThread(Thread thread) { thread.interrupt(); - _metrics.addMeteredGlobalValue(_queryKilledMeter, 1); + if (_isQueryKilledMetricEnabled) { + _metrics.addMeteredGlobalValue(_queryKilledMeter, 1); + } _numQueriesKilledConsecutively += 1; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java index f769c0ddc7..3e6c9004a8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java @@ -30,14 +30,14 @@ import org.apache.pinot.spi.env.PinotConfiguration; */ public class PerQueryCPUMemAccountantFactoryForTest implements ThreadAccountantFactory { @Override - public ThreadResourceUsageAccountant init(PinotConfiguration config) { - return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config); + public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) { + return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId); } public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { - public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config) { - super(config); + public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId) { + super(config, instanceId); } public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java index 02abb39b84..0b7e6e05d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java @@ -26,6 +26,7 @@ import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.spi.exception.QueryCancelledException; public class ExceptionResultsBlock extends BaseResultsBlock { @@ -38,6 +39,10 @@ public class ExceptionResultsBlock extends BaseResultsBlock { this(QueryException.QUERY_EXECUTION_ERROR, t); } + public ExceptionResultsBlock(QueryCancelledException t) { + this(QueryException.QUERY_CANCELLATION_ERROR, t); + } + @Override public int getNumRows() { return 0; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java index b98093fde4..9414dc11ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java @@ -28,7 +28,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant; import org.apache.pinot.core.util.trace.TracedThreadFactory; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,8 +93,6 @@ public abstract class ResourceManager { CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT); _queryWorkers = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory)); - - Tracing.ThreadAccountantOps.initializeThreadAccountant(config); } public void stop() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java index b11ddc402c..d67d34fc9b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java @@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; @@ -254,6 +255,8 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest extends BaseClusterInt LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1); Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains( "Interrupted in broker reduce phase")); + Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":" + + QueryException.QUERY_CANCELLATION_ERROR_CODE)); Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because")); Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString())); Assert.assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString())); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java similarity index 96% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java index 8a07cc7c53..db877ac005 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java @@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.config.instance.InstanceType; @@ -58,8 +59,8 @@ import org.testng.annotations.Test; /** * Integration test for heap size based server query killing, this works only for xmx4G */ -public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterIntegrationTestSet { - private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class); +public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterIntegrationTestSet { + private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class); public static final String STRING_DIM_SV1 = "stringDimSV1"; public static final String STRING_DIM_SV2 = "stringDimSV2"; public static final String INT_DIM_SV1 = "intDimSV1"; @@ -101,7 +102,7 @@ private static final int NUM_DOCS = 3_000_000; public void setUp() throws Exception { // Setup logging and resource accounting - LogManager.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class).setLevel(Level.INFO); + LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO); LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class) .setLevel(Level.INFO); LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO); @@ -216,6 +217,8 @@ private static final int NUM_DOCS = 3_000_000; throws Exception { JsonNode queryResponse = postQuery(OOM_QUERY); LOGGER.info("testDigestOOM: {}", queryResponse); + Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":" + + QueryException.QUERY_CANCELLATION_ERROR_CODE)); Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException")); Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because")); } @@ -267,6 +270,7 @@ private static final int NUM_DOCS = 3_000_000; ); countDownLatch.await(); LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1); + Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":503")); Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("QueryCancelledException")); Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because")); Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString())); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 0f77155dbe..3fd6710265 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -87,6 +87,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.services.ServiceStartable; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance; @@ -544,6 +545,9 @@ public abstract class BaseServerStarter implements ServiceStartable { ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries); + // initialize the thread accountant for query killing + Tracing.ThreadAccountantOps + .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java index 803219ce26..f3755c5569 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java @@ -22,5 +22,5 @@ import org.apache.pinot.spi.env.PinotConfiguration; public interface ThreadAccountantFactory { - ThreadResourceUsageAccountant init(PinotConfiguration config); + ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 0881f763d9..a853b03c20 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -257,7 +257,7 @@ public class Tracing { Tracing.getThreadAccountant().clear(); } - public static void initializeThreadAccountant(PinotConfiguration config) { + public static void initializeThreadAccountant(PinotConfiguration config, String instanceId) { String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME); if (factoryName == null) { LOGGER.warn("No thread accountant factory provided, using default implementation"); @@ -266,7 +266,7 @@ public class Tracing { try { ThreadAccountantFactory threadAccountantFactory = (ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance(); - boolean registered = Tracing.register(threadAccountantFactory.init(config)); + boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId)); LOGGER.info("Using accountant provided by {}", factoryName); if (!registered) { LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 5e03e40217..585c56520c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -820,6 +820,9 @@ public class CommonConstants { public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms"; public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0; + + public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled"; + public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false; } public static class ExecutorService { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org