Jackie-Jiang commented on code in PR #16080: URL: https://github.com/apache/pinot/pull/16080#discussion_r2140933749
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java: ########## @@ -64,18 +66,19 @@ public class MultiStageQueryThrottler implements ClusterChangeHandler { private AdjustableSemaphore _semaphore; private final AtomicInteger _currentQueryServerThreads = new AtomicInteger(); + private final PinotConfiguration _pinotConfiguration; Review Comment: (minor) Suggest naming it `_brokerConf` for clarity ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java: ########## @@ -193,4 +204,24 @@ public int currentQueryServerThreads() { int availablePermits() { return _semaphore.availablePermits(); } + + @VisibleForTesting + int calculateMaxServerQueryThreads(PinotConfiguration config) { + Map<String, String> clusterConfigMap = _helixAdmin.getConfig( + _helixConfigScope, + Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromClusterConfig = Integer.parseInt( + clusterConfigMap.getOrDefault( + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromBrokerConfig = config.getProperty( + CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS); Review Comment: Given this one doesn't change, we can calculate it in the constructor and store it in a member variable ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java: ########## @@ -193,4 +204,24 @@ public int currentQueryServerThreads() { int availablePermits() { return _semaphore.availablePermits(); } + + @VisibleForTesting + int calculateMaxServerQueryThreads(PinotConfiguration config) { + Map<String, String> clusterConfigMap = _helixAdmin.getConfig( + _helixConfigScope, + Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromClusterConfig = Integer.parseInt( + clusterConfigMap.getOrDefault( + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromBrokerConfig = config.getProperty( + CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS); + + if (maxServerQueryThreadsFromClusterConfig <= 0 && maxServerQueryThreadsFromBrokerConfig <= 0) { + return 0; + } + return Math.min(maxServerQueryThreadsFromClusterConfig, Review Comment: If only one of them are positive, we want to pick the one positive. Currently it will end up picking the one negative ########## pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java: ########## @@ -307,9 +309,75 @@ public void testDisabledToEnabledTransitionDisallowed() } } + @Test + public void testCalculateMaxServerQueryThreads() { + // Neither config is set, both use defaults + when(_helixAdmin.getConfig(any(), + eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))) + ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1")); + + PinotConfiguration emptyConfig = new PinotConfiguration(); // No MSE_MAX_SERVER_QUERY_THREADS set + _multiStageQueryThrottler = new MultiStageQueryThrottler(emptyConfig); + _multiStageQueryThrottler.init(_helixManager); + + Assert.assertEquals(0, _multiStageQueryThrottler.calculateMaxServerQueryThreads(emptyConfig)); Review Comment: Switch the 2 arguments. First one is `actual` and second one is `expected`. Same for other asserts ########## pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java: ########## @@ -46,21 +50,31 @@ public HardLimitExecutor(int max, ExecutorService executorService) { */ public static int getMultiStageExecutorHardLimit(PinotConfiguration config) { try { - int maxThreads = Integer.parseInt(config.getProperty( + int maxThreadsFromClusterConfig = Integer.parseInt(config.getProperty( Review Comment: This one is not from cluster config. We can make it the following way: - If the server level config (the one newly added) is configured, use it - If it is not configured, fall back to the old one ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -195,6 +195,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config); if (hardLimit > 0) { + LOGGER.info("Setting hard limit for multi-stage executor to: hardLimit={}", hardLimit); Review Comment: (minor) ```suggestion LOGGER.info("Setting hard limit for multi-stage executor to: {}", hardLimit); ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java: ########## @@ -193,4 +204,24 @@ public int currentQueryServerThreads() { int availablePermits() { return _semaphore.availablePermits(); } + + @VisibleForTesting + int calculateMaxServerQueryThreads(PinotConfiguration config) { + Map<String, String> clusterConfigMap = _helixAdmin.getConfig( + _helixConfigScope, + Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromClusterConfig = Integer.parseInt( + clusterConfigMap.getOrDefault( + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)); + int maxServerQueryThreadsFromBrokerConfig = config.getProperty( + CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS, + CommonConstants.Broker.DEFAULT_MSE_MAX_SERVER_QUERY_THREADS); Review Comment: Since instance level override has higher priority, we can consider always use the instance level one when it exists. We calculate the cluster level one only if instance level one doesn't exist ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java: ########## @@ -193,4 +204,24 @@ public int currentQueryServerThreads() { int availablePermits() { return _semaphore.availablePermits(); } + + @VisibleForTesting + int calculateMaxServerQueryThreads(PinotConfiguration config) { Review Comment: (minor) No need to pass in argument ########## pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java: ########## @@ -23,13 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Executor that allows a maximum of tasks running at the same time, rejecting immediately any excess. */ public class HardLimitExecutor extends DecoratorExecutorService { + private static final Logger LOGGER = LoggerFactory.getLogger(HardLimitExecutor.class); Review Comment: Seems not used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org