Jackie-Jiang commented on code in PR #16080:
URL: https://github.com/apache/pinot/pull/16080#discussion_r2141347061


##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java:
##########
@@ -46,21 +46,28 @@ public HardLimitExecutor(int max, ExecutorService 
executorService) {
    */
   public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
     try {
-      int maxThreads = Integer.parseInt(config.getProperty(
+      int serverConfigLimit = 
config.getProperty(CommonConstants.Server.CONFIG_OF_MSE_MAX_EXECUTION_THREADS,
+              CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS);
+      if (serverConfigLimit > 0) {
+        return serverConfigLimit;
+      }
+      int maxThreadsFromClusterConfig = Integer.parseInt(config.getProperty(
           
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
           
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
       ));
       int hardLimitFactor = Integer.parseInt(config.getProperty(
           
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR,
           
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR
       ));
-      if (maxThreads <= 0 || hardLimitFactor <= 0) {
+      if (maxThreadsFromClusterConfig <= 0 || hardLimitFactor <= 0) {
         return 0;
       }
-      return maxThreads * hardLimitFactor;
+      return maxThreadsFromClusterConfig * hardLimitFactor;
     } catch (NumberFormatException e) {
-      return 
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
-          * 
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+      int defaultLimitFromClusterConfig =
+              
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)
+              * 
Integer.parseInt(CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_HARDLIMIT_FACTOR);
+      return 
Math.min(CommonConstants.Server.DEFAULT_MSE_MAX_EXECUTION_THREADS, 
defaultLimitFromClusterConfig);

Review Comment:
   Not introduced in this PR, but it will always return negative value. 
Consider log a warning and return `-1` which disables the throttling



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -56,26 +57,28 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
   private HelixConfigScope _helixConfigScope;
   private int _numBrokers;
   private int _numServers;
+  private AdjustableSemaphore _semaphore;
   /**
    * If _maxServerQueryThreads is <= 0, it means that the cluster is not 
configured to limit the number of multi-stage
    * queries that can be executed concurrently. In this case, we should not 
block the query.
    */
   private int _maxServerQueryThreads;
-  private AdjustableSemaphore _semaphore;
+  private final int _maxServerQueryThreadsFromServerConfig;
   private final AtomicInteger _currentQueryServerThreads = new AtomicInteger();
 
+  public MultiStageQueryThrottler(PinotConfiguration brokerConf) {
+    _maxServerQueryThreadsFromServerConfig = brokerConf.getProperty(
+            CommonConstants.Broker.CONFIG_OF_MSE_MAX_SERVER_QUERY_THREADS,

Review Comment:
   (Code Style) Is the indentation correct?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/HardLimitExecutor.java:
##########
@@ -46,21 +50,31 @@ public HardLimitExecutor(int max, ExecutorService 
executorService) {
    */
   public static int getMultiStageExecutorHardLimit(PinotConfiguration config) {
     try {
-      int maxThreads = Integer.parseInt(config.getProperty(
+      int maxThreadsFromClusterConfig = Integer.parseInt(config.getProperty(

Review Comment:
   Consider renaming the variable given this is instance level config



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -195,6 +195,7 @@ public void init(PinotConfiguration config, 
InstanceDataManager instanceDataMana
 
     int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config);
     if (hardLimit > 0) {
+      LOGGER.info("Setting hard limit for multi-stage executor to: 
hardLimit={}", hardLimit);

Review Comment:
   We don't usually use `k=v` in the log. Most common way is `key: {}`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -86,7 +89,10 @@ public void init(HelixManager helixManager) {
         .count());
 
     if (_maxServerQueryThreads > 0) {
-      _semaphore = new AdjustableSemaphore(Math.max(1, _maxServerQueryThreads 
* _numServers / _numBrokers), true);
+      int semaphoreLimit = Math.max(1, _maxServerQueryThreads * _numServers / 
_numBrokers);
+      LOGGER.info("Setting max server query threads to {} for 
_maxServerQueryThreads={}, {} brokers, and {} servers",

Review Comment:
   (minor) We usually put colon before variables. Same for other places
   ```suggestion
         LOGGER.info("Setting max server query threads to: {} with 
maxServerQueryThreads: {}, numBrokers: {}, numServers: {}",
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java:
##########
@@ -56,26 +57,28 @@ public class MultiStageQueryThrottler implements 
ClusterChangeHandler {
   private HelixConfigScope _helixConfigScope;
   private int _numBrokers;
   private int _numServers;
+  private AdjustableSemaphore _semaphore;
   /**
    * If _maxServerQueryThreads is <= 0, it means that the cluster is not 
configured to limit the number of multi-stage
    * queries that can be executed concurrently. In this case, we should not 
block the query.
    */
   private int _maxServerQueryThreads;
-  private AdjustableSemaphore _semaphore;
+  private final int _maxServerQueryThreadsFromServerConfig;

Review Comment:
   ```suggestion
     private final int _maxServerQueryThreadsFromBrokerConfig;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to