Jackie-Jiang commented on code in PR #14943:
URL: https://github.com/apache/pinot/pull/14943#discussion_r1938058304


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
  */
 package org.apache.pinot.segment.local.utils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Used to throttle the total concurrent index rebuilds that can happen on a 
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total 
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the 
following levels:
+ * - All index throttling
+ * - StarTree index throttling
  * Code paths that do no need to rebuild the index or which don't happen on 
the server need not utilize this throttler.
  */
-public class SegmentPreprocessThrottler implements 
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
 
-  /**
-   * _maxPreprocessConcurrency and 
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively 
disable
-   * throttling, this can be set to a very high value
-   */
-  private int _maxPreprocessConcurrency;
-  private int _maxPreprocessConcurrencyBeforeServingQueries;
-  private boolean _isServingQueries;
-  private final AdjustableSemaphore _semaphore;
-
-  /**
-   * @param maxPreprocessConcurrency configured preprocessing concurrency
-   * @param maxPreprocessConcurrencyBeforeServingQueries configured 
preprocessing concurrency before serving queries
-   * @param isServingQueries whether the server is ready to serve queries or 
not
-   */
-  public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int 
maxPreprocessConcurrencyBeforeServingQueries,
-      boolean isServingQueries) {
-    LOGGER.info("Initializing SegmentPreprocessThrottler, 
maxPreprocessConcurrency: {}, "
-            + "maxPreprocessConcurrencyBeforeServingQueries: {}, 
isServingQueries: {}",
-        maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
-    Preconditions.checkArgument(maxPreprocessConcurrency > 0,
-        "Max preprocess parallelism must be > 0, but found to be: " + 
maxPreprocessConcurrency);
-    Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 
0,
-        "Max preprocess parallelism before serving queries must be > 0, but 
found to be: "
-            + maxPreprocessConcurrencyBeforeServingQueries);
-
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    _isServingQueries = isServingQueries;
-
-    // maxConcurrentPreprocessesBeforeServingQueries is only used prior to 
serving queries and once the server is
-    // ready to serve queries this is not used again. This too is configurable 
via ZK CLUSTER config updates while the
-    // server is starting up.
-    int preprocessConcurrency = _maxPreprocessConcurrency;
-    if (!isServingQueries) {
-      preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
-      LOGGER.info("Serving queries is disabled, setting preprocess concurrency 
to: {}", preprocessConcurrency);
-    }
-    _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
-    LOGGER.info("Created semaphore with total permits: {}, available permits: 
{}", totalPermits(),
-        availablePermits());
-  }
-
-  public synchronized void startServingQueries() {
-    LOGGER.info("Serving queries is to be enabled, reset throttling threshold 
for segment preprocess concurrency, "
-        + "total permits: {}, available permits: {}", totalPermits(), 
availablePermits());
-    _isServingQueries = true;
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Reset throttling completed, new concurrency: {}, total 
permits: {}, available permits: {}",
-        _maxPreprocessConcurrency, totalPermits(), availablePermits());
-  }
-
-  @Override
-  public synchronized void onChange(Set<String> changedConfigs, Map<String, 
String> clusterConfigs) {
-    if (CollectionUtils.isEmpty(changedConfigs)) {
-      LOGGER.info("Skip updating SegmentPreprocessThrottler configs with 
unchanged clusterConfigs");
-      return;
-    }
-
-    LOGGER.info("Updating SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-    handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
-    handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, 
clusterConfigs);
-    LOGGER.info("Updated SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-  }
-
-  private void handleMaxPreprocessConcurrencyChange(Set<String> 
changedConfigs, Map<String, String> clusterConfigs) {
-    if 
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
 {
-      LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was 
not updated, skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String maxParallelSegmentPreprocessesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrency;
-    try {
-      maxPreprocessConcurrency = 
Integer.parseInt(maxParallelSegmentPreprocessesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making 
change, fix config and try again",
-          maxParallelSegmentPreprocessesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrency <= 0) {
-      LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making 
change, fix config and try again",
-          maxPreprocessConcurrency);
-      return;
-    }
-
-    if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
-      LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total 
permits: {}", _maxPreprocessConcurrency,
-          totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", 
_maxPreprocessConcurrency,
-        maxPreprocessConcurrency);
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
-    if (!_isServingQueries) {
-      LOGGER.info("Serving queries hasn't been enabled yet, not updating the 
permits with maxPreprocessConcurrency");
-      return;
-    }
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Updated total permits: {}", totalPermits());
-  }
-
-  private void 
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> 
changedConfigs,
-      Map<String, String> clusterConfigs) {
-    if (!changedConfigs.contains(
-        
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
 {
-      LOGGER.info("changedConfigs list indicates 
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
-          + "skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrencyBeforeServingQueries;
-    try {
-      maxPreprocessConcurrencyBeforeServingQueries =
-          
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: 
{}, not making change, fix config and "
-              + "try again", 
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
-      LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 
0, not making change, fix config "
-          + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries == 
_maxPreprocessConcurrencyBeforeServingQueries) {
-      LOGGER.info("No ZK update for 
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
-          _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} 
to: {}",
-        _maxPreprocessConcurrencyBeforeServingQueries, 
maxPreprocessConcurrencyBeforeServingQueries);
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    if (!_isServingQueries) {
-      LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated 
before serving queries was enabled, "
-          + "updating the permits");
-      _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
-      LOGGER.info("Updated total permits: {}", totalPermits());
-    }
-  }
-
-  /**
-   * Block trying to acquire the semaphore to perform the segment index 
rebuild steps unless interrupted.
-   * <p>
-   * {@link #release()} should be called after the segment preprocess 
completes. It is the responsibility of the caller
-   * to ensure that {@link #release()} is called exactly once for each call to 
this method.
-   *
-   * @throws InterruptedException if the current thread is interrupted
-   */
-  public void acquire()
-      throws InterruptedException {
-    _semaphore.acquire();
-  }
+  SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+  SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;
 
   /**
-   * Should be called after the segment index build completes. It is the 
responsibility of the caller to ensure that
-   * this method is called exactly once for each call to {@link #acquire()}.
+   * Constructor for SegmentPreprocessThrottler
+   * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to 
use for all indexes
+   * @param segmentStarTreePreprocessThrottler segment preprocess throttler to 
use for StarTree index
    */
-  public void release() {
-    _semaphore.release();
+  public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler 
segmentAllIndexPreprocessThrottler,
+      SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) {

Review Comment:
   Are they nullable? If so, we also need to do null check on the caller side



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
  */
 package org.apache.pinot.segment.local.utils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Used to throttle the total concurrent index rebuilds that can happen on a 
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total 
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the 
following levels:
+ * - All index throttling
+ * - StarTree index throttling
  * Code paths that do no need to rebuild the index or which don't happen on 
the server need not utilize this throttler.
  */
-public class SegmentPreprocessThrottler implements 
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
 
-  /**
-   * _maxPreprocessConcurrency and 
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively 
disable
-   * throttling, this can be set to a very high value
-   */
-  private int _maxPreprocessConcurrency;
-  private int _maxPreprocessConcurrencyBeforeServingQueries;
-  private boolean _isServingQueries;
-  private final AdjustableSemaphore _semaphore;
-
-  /**
-   * @param maxPreprocessConcurrency configured preprocessing concurrency
-   * @param maxPreprocessConcurrencyBeforeServingQueries configured 
preprocessing concurrency before serving queries
-   * @param isServingQueries whether the server is ready to serve queries or 
not
-   */
-  public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int 
maxPreprocessConcurrencyBeforeServingQueries,
-      boolean isServingQueries) {
-    LOGGER.info("Initializing SegmentPreprocessThrottler, 
maxPreprocessConcurrency: {}, "
-            + "maxPreprocessConcurrencyBeforeServingQueries: {}, 
isServingQueries: {}",
-        maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
-    Preconditions.checkArgument(maxPreprocessConcurrency > 0,
-        "Max preprocess parallelism must be > 0, but found to be: " + 
maxPreprocessConcurrency);
-    Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 
0,
-        "Max preprocess parallelism before serving queries must be > 0, but 
found to be: "
-            + maxPreprocessConcurrencyBeforeServingQueries);
-
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    _isServingQueries = isServingQueries;
-
-    // maxConcurrentPreprocessesBeforeServingQueries is only used prior to 
serving queries and once the server is
-    // ready to serve queries this is not used again. This too is configurable 
via ZK CLUSTER config updates while the
-    // server is starting up.
-    int preprocessConcurrency = _maxPreprocessConcurrency;
-    if (!isServingQueries) {
-      preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
-      LOGGER.info("Serving queries is disabled, setting preprocess concurrency 
to: {}", preprocessConcurrency);
-    }
-    _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
-    LOGGER.info("Created semaphore with total permits: {}, available permits: 
{}", totalPermits(),
-        availablePermits());
-  }
-
-  public synchronized void startServingQueries() {
-    LOGGER.info("Serving queries is to be enabled, reset throttling threshold 
for segment preprocess concurrency, "
-        + "total permits: {}, available permits: {}", totalPermits(), 
availablePermits());
-    _isServingQueries = true;
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Reset throttling completed, new concurrency: {}, total 
permits: {}, available permits: {}",
-        _maxPreprocessConcurrency, totalPermits(), availablePermits());
-  }
-
-  @Override
-  public synchronized void onChange(Set<String> changedConfigs, Map<String, 
String> clusterConfigs) {
-    if (CollectionUtils.isEmpty(changedConfigs)) {
-      LOGGER.info("Skip updating SegmentPreprocessThrottler configs with 
unchanged clusterConfigs");
-      return;
-    }
-
-    LOGGER.info("Updating SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-    handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
-    handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, 
clusterConfigs);
-    LOGGER.info("Updated SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-  }
-
-  private void handleMaxPreprocessConcurrencyChange(Set<String> 
changedConfigs, Map<String, String> clusterConfigs) {
-    if 
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
 {
-      LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was 
not updated, skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String maxParallelSegmentPreprocessesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrency;
-    try {
-      maxPreprocessConcurrency = 
Integer.parseInt(maxParallelSegmentPreprocessesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making 
change, fix config and try again",
-          maxParallelSegmentPreprocessesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrency <= 0) {
-      LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making 
change, fix config and try again",
-          maxPreprocessConcurrency);
-      return;
-    }
-
-    if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
-      LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total 
permits: {}", _maxPreprocessConcurrency,
-          totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", 
_maxPreprocessConcurrency,
-        maxPreprocessConcurrency);
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
-    if (!_isServingQueries) {
-      LOGGER.info("Serving queries hasn't been enabled yet, not updating the 
permits with maxPreprocessConcurrency");
-      return;
-    }
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Updated total permits: {}", totalPermits());
-  }
-
-  private void 
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> 
changedConfigs,
-      Map<String, String> clusterConfigs) {
-    if (!changedConfigs.contains(
-        
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
 {
-      LOGGER.info("changedConfigs list indicates 
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
-          + "skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrencyBeforeServingQueries;
-    try {
-      maxPreprocessConcurrencyBeforeServingQueries =
-          
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: 
{}, not making change, fix config and "
-              + "try again", 
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
-      LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 
0, not making change, fix config "
-          + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries == 
_maxPreprocessConcurrencyBeforeServingQueries) {
-      LOGGER.info("No ZK update for 
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
-          _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} 
to: {}",
-        _maxPreprocessConcurrencyBeforeServingQueries, 
maxPreprocessConcurrencyBeforeServingQueries);
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    if (!_isServingQueries) {
-      LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated 
before serving queries was enabled, "
-          + "updating the permits");
-      _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
-      LOGGER.info("Updated total permits: {}", totalPermits());
-    }
-  }
-
-  /**
-   * Block trying to acquire the semaphore to perform the segment index 
rebuild steps unless interrupted.
-   * <p>
-   * {@link #release()} should be called after the segment preprocess 
completes. It is the responsibility of the caller
-   * to ensure that {@link #release()} is called exactly once for each call to 
this method.
-   *
-   * @throws InterruptedException if the current thread is interrupted
-   */
-  public void acquire()
-      throws InterruptedException {
-    _semaphore.acquire();
-  }
+  SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+  SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;

Review Comment:
   ```suggestion
     private final SegmentAllIndexPreprocessThrottler 
_segmentAllIndexPreprocessThrottler;
     private final SegmentStarTreePreprocessThrottler 
_segmentStarTreePreprocessThrottler;
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
  */
 package org.apache.pinot.segment.local.utils;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Used to throttle the total concurrent index rebuilds that can happen on a 
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total 
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the 
following levels:
+ * - All index throttling
+ * - StarTree index throttling
  * Code paths that do no need to rebuild the index or which don't happen on 
the server need not utilize this throttler.
  */
-public class SegmentPreprocessThrottler implements 
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
 
-  /**
-   * _maxPreprocessConcurrency and 
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively 
disable
-   * throttling, this can be set to a very high value
-   */
-  private int _maxPreprocessConcurrency;
-  private int _maxPreprocessConcurrencyBeforeServingQueries;
-  private boolean _isServingQueries;
-  private final AdjustableSemaphore _semaphore;
-
-  /**
-   * @param maxPreprocessConcurrency configured preprocessing concurrency
-   * @param maxPreprocessConcurrencyBeforeServingQueries configured 
preprocessing concurrency before serving queries
-   * @param isServingQueries whether the server is ready to serve queries or 
not
-   */
-  public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int 
maxPreprocessConcurrencyBeforeServingQueries,
-      boolean isServingQueries) {
-    LOGGER.info("Initializing SegmentPreprocessThrottler, 
maxPreprocessConcurrency: {}, "
-            + "maxPreprocessConcurrencyBeforeServingQueries: {}, 
isServingQueries: {}",
-        maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
-    Preconditions.checkArgument(maxPreprocessConcurrency > 0,
-        "Max preprocess parallelism must be > 0, but found to be: " + 
maxPreprocessConcurrency);
-    Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 
0,
-        "Max preprocess parallelism before serving queries must be > 0, but 
found to be: "
-            + maxPreprocessConcurrencyBeforeServingQueries);
-
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    _isServingQueries = isServingQueries;
-
-    // maxConcurrentPreprocessesBeforeServingQueries is only used prior to 
serving queries and once the server is
-    // ready to serve queries this is not used again. This too is configurable 
via ZK CLUSTER config updates while the
-    // server is starting up.
-    int preprocessConcurrency = _maxPreprocessConcurrency;
-    if (!isServingQueries) {
-      preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
-      LOGGER.info("Serving queries is disabled, setting preprocess concurrency 
to: {}", preprocessConcurrency);
-    }
-    _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
-    LOGGER.info("Created semaphore with total permits: {}, available permits: 
{}", totalPermits(),
-        availablePermits());
-  }
-
-  public synchronized void startServingQueries() {
-    LOGGER.info("Serving queries is to be enabled, reset throttling threshold 
for segment preprocess concurrency, "
-        + "total permits: {}, available permits: {}", totalPermits(), 
availablePermits());
-    _isServingQueries = true;
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Reset throttling completed, new concurrency: {}, total 
permits: {}, available permits: {}",
-        _maxPreprocessConcurrency, totalPermits(), availablePermits());
-  }
-
-  @Override
-  public synchronized void onChange(Set<String> changedConfigs, Map<String, 
String> clusterConfigs) {
-    if (CollectionUtils.isEmpty(changedConfigs)) {
-      LOGGER.info("Skip updating SegmentPreprocessThrottler configs with 
unchanged clusterConfigs");
-      return;
-    }
-
-    LOGGER.info("Updating SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-    handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
-    handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, 
clusterConfigs);
-    LOGGER.info("Updated SegmentPreprocessThrottler configs with latest 
clusterConfigs");
-  }
-
-  private void handleMaxPreprocessConcurrencyChange(Set<String> 
changedConfigs, Map<String, String> clusterConfigs) {
-    if 
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
 {
-      LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was 
not updated, skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
-    String maxParallelSegmentPreprocessesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrency;
-    try {
-      maxPreprocessConcurrency = 
Integer.parseInt(maxParallelSegmentPreprocessesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making 
change, fix config and try again",
-          maxParallelSegmentPreprocessesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrency <= 0) {
-      LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making 
change, fix config and try again",
-          maxPreprocessConcurrency);
-      return;
-    }
-
-    if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
-      LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total 
permits: {}", _maxPreprocessConcurrency,
-          totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", 
_maxPreprocessConcurrency,
-        maxPreprocessConcurrency);
-    _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
-    if (!_isServingQueries) {
-      LOGGER.info("Serving queries hasn't been enabled yet, not updating the 
permits with maxPreprocessConcurrency");
-      return;
-    }
-    _semaphore.setPermits(_maxPreprocessConcurrency);
-    LOGGER.info("Updated total permits: {}", totalPermits());
-  }
-
-  private void 
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> 
changedConfigs,
-      Map<String, String> clusterConfigs) {
-    if (!changedConfigs.contains(
-        
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
 {
-      LOGGER.info("changedConfigs list indicates 
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
-          + "skipping updates");
-      return;
-    }
-
-    String configName = 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String defaultConfigValue = 
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
-    String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
-        clusterConfigs == null ? defaultConfigValue : 
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
-    int maxPreprocessConcurrencyBeforeServingQueries;
-    try {
-      maxPreprocessConcurrencyBeforeServingQueries =
-          
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-    } catch (Exception e) {
-      LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: 
{}, not making change, fix config and "
-              + "try again", 
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
-      LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 
0, not making change, fix config "
-          + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
-      return;
-    }
-
-    if (maxPreprocessConcurrencyBeforeServingQueries == 
_maxPreprocessConcurrencyBeforeServingQueries) {
-      LOGGER.info("No ZK update for 
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
-          _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
-      return;
-    }
-
-    LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} 
to: {}",
-        _maxPreprocessConcurrencyBeforeServingQueries, 
maxPreprocessConcurrencyBeforeServingQueries);
-    _maxPreprocessConcurrencyBeforeServingQueries = 
maxPreprocessConcurrencyBeforeServingQueries;
-    if (!_isServingQueries) {
-      LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated 
before serving queries was enabled, "
-          + "updating the permits");
-      _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
-      LOGGER.info("Updated total permits: {}", totalPermits());
-    }
-  }
-
-  /**
-   * Block trying to acquire the semaphore to perform the segment index 
rebuild steps unless interrupted.
-   * <p>
-   * {@link #release()} should be called after the segment preprocess 
completes. It is the responsibility of the caller
-   * to ensure that {@link #release()} is called exactly once for each call to 
this method.
-   *
-   * @throws InterruptedException if the current thread is interrupted
-   */
-  public void acquire()
-      throws InterruptedException {
-    _semaphore.acquire();
-  }
+  SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+  SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;
 
   /**
-   * Should be called after the segment index build completes. It is the 
responsibility of the caller to ensure that
-   * this method is called exactly once for each call to {@link #acquire()}.
+   * Constructor for SegmentPreprocessThrottler
+   * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to 
use for all indexes
+   * @param segmentStarTreePreprocessThrottler segment preprocess throttler to 
use for StarTree index
    */
-  public void release() {
-    _semaphore.release();
+  public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler 
segmentAllIndexPreprocessThrottler,
+      SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) {
+    LOGGER.info("Initializing SegmentPreprocessThrottler");

Review Comment:
   This info is not very useful. I assume we are logging details within the 
individual throttlers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to