Jackie-Jiang commented on code in PR #14943: URL: https://github.com/apache/pinot/pull/14943#discussion_r1938058304
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java: ########## @@ -18,208 +18,40 @@ */ package org.apache.pinot.segment.local.utils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Map; -import java.util.Set; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.concurrency.AdjustableSemaphore; -import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; -import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Used to throttle the total concurrent index rebuilds that can happen on a given Pinot server. + * Contains all the segment preprocess throttlers used to control the total index rebuilds that can happen on a given + * Pinot server. For now this class supports index rebuild throttling at the following levels: + * - All index throttling + * - StarTree index throttling * Code paths that do no need to rebuild the index or which don't happen on the server need not utilize this throttler. */ -public class SegmentPreprocessThrottler implements PinotClusterConfigChangeListener { +public class SegmentPreprocessThrottler { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessThrottler.class); - /** - * _maxPreprocessConcurrency and _maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively disable - * throttling, this can be set to a very high value - */ - private int _maxPreprocessConcurrency; - private int _maxPreprocessConcurrencyBeforeServingQueries; - private boolean _isServingQueries; - private final AdjustableSemaphore _semaphore; - - /** - * @param maxPreprocessConcurrency configured preprocessing concurrency - * @param maxPreprocessConcurrencyBeforeServingQueries configured preprocessing concurrency before serving queries - * @param isServingQueries whether the server is ready to serve queries or not - */ - public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int maxPreprocessConcurrencyBeforeServingQueries, - boolean isServingQueries) { - LOGGER.info("Initializing SegmentPreprocessThrottler, maxPreprocessConcurrency: {}, " - + "maxPreprocessConcurrencyBeforeServingQueries: {}, isServingQueries: {}", - maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, isServingQueries); - Preconditions.checkArgument(maxPreprocessConcurrency > 0, - "Max preprocess parallelism must be > 0, but found to be: " + maxPreprocessConcurrency); - Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 0, - "Max preprocess parallelism before serving queries must be > 0, but found to be: " - + maxPreprocessConcurrencyBeforeServingQueries); - - _maxPreprocessConcurrency = maxPreprocessConcurrency; - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - _isServingQueries = isServingQueries; - - // maxConcurrentPreprocessesBeforeServingQueries is only used prior to serving queries and once the server is - // ready to serve queries this is not used again. This too is configurable via ZK CLUSTER config updates while the - // server is starting up. - int preprocessConcurrency = _maxPreprocessConcurrency; - if (!isServingQueries) { - preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries; - LOGGER.info("Serving queries is disabled, setting preprocess concurrency to: {}", preprocessConcurrency); - } - _semaphore = new AdjustableSemaphore(preprocessConcurrency, true); - LOGGER.info("Created semaphore with total permits: {}, available permits: {}", totalPermits(), - availablePermits()); - } - - public synchronized void startServingQueries() { - LOGGER.info("Serving queries is to be enabled, reset throttling threshold for segment preprocess concurrency, " - + "total permits: {}, available permits: {}", totalPermits(), availablePermits()); - _isServingQueries = true; - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Reset throttling completed, new concurrency: {}, total permits: {}, available permits: {}", - _maxPreprocessConcurrency, totalPermits(), availablePermits()); - } - - @Override - public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (CollectionUtils.isEmpty(changedConfigs)) { - LOGGER.info("Skip updating SegmentPreprocessThrottler configs with unchanged clusterConfigs"); - return; - } - - LOGGER.info("Updating SegmentPreprocessThrottler configs with latest clusterConfigs"); - handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs); - handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, clusterConfigs); - LOGGER.info("Updated SegmentPreprocessThrottler configs with latest clusterConfigs"); - } - - private void handleMaxPreprocessConcurrencyChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was not updated, skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String maxParallelSegmentPreprocessesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrency; - try { - maxPreprocessConcurrency = Integer.parseInt(maxParallelSegmentPreprocessesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making change, fix config and try again", - maxParallelSegmentPreprocessesStr); - return; - } - - if (maxPreprocessConcurrency <= 0) { - LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making change, fix config and try again", - maxPreprocessConcurrency); - return; - } - - if (maxPreprocessConcurrency == _maxPreprocessConcurrency) { - LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total permits: {}", _maxPreprocessConcurrency, - totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", _maxPreprocessConcurrency, - maxPreprocessConcurrency); - _maxPreprocessConcurrency = maxPreprocessConcurrency; - - if (!_isServingQueries) { - LOGGER.info("Serving queries hasn't been enabled yet, not updating the permits with maxPreprocessConcurrency"); - return; - } - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - - private void handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> changedConfigs, - Map<String, String> clusterConfigs) { - if (!changedConfigs.contains( - CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrencyBeforeServingQueries was not updated, " - + "skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String maxParallelSegmentPreprocessesBeforeServingQueriesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrencyBeforeServingQueries; - try { - maxPreprocessConcurrencyBeforeServingQueries = - Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: {}, not making change, fix config and " - + "try again", maxParallelSegmentPreprocessesBeforeServingQueriesStr); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries <= 0) { - LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 0, not making change, fix config " - + "and try again", maxPreprocessConcurrencyBeforeServingQueries); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries == _maxPreprocessConcurrencyBeforeServingQueries) { - LOGGER.info("No ZK update for maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}", - _maxPreprocessConcurrencyBeforeServingQueries, totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} to: {}", - _maxPreprocessConcurrencyBeforeServingQueries, maxPreprocessConcurrencyBeforeServingQueries); - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - if (!_isServingQueries) { - LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated before serving queries was enabled, " - + "updating the permits"); - _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - } - - /** - * Block trying to acquire the semaphore to perform the segment index rebuild steps unless interrupted. - * <p> - * {@link #release()} should be called after the segment preprocess completes. It is the responsibility of the caller - * to ensure that {@link #release()} is called exactly once for each call to this method. - * - * @throws InterruptedException if the current thread is interrupted - */ - public void acquire() - throws InterruptedException { - _semaphore.acquire(); - } + SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler; + SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler; /** - * Should be called after the segment index build completes. It is the responsibility of the caller to ensure that - * this method is called exactly once for each call to {@link #acquire()}. + * Constructor for SegmentPreprocessThrottler + * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to use for all indexes + * @param segmentStarTreePreprocessThrottler segment preprocess throttler to use for StarTree index */ - public void release() { - _semaphore.release(); + public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler, + SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) { Review Comment: Are they nullable? If so, we also need to do null check on the caller side ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java: ########## @@ -18,208 +18,40 @@ */ package org.apache.pinot.segment.local.utils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Map; -import java.util.Set; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.concurrency.AdjustableSemaphore; -import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; -import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Used to throttle the total concurrent index rebuilds that can happen on a given Pinot server. + * Contains all the segment preprocess throttlers used to control the total index rebuilds that can happen on a given + * Pinot server. For now this class supports index rebuild throttling at the following levels: + * - All index throttling + * - StarTree index throttling * Code paths that do no need to rebuild the index or which don't happen on the server need not utilize this throttler. */ -public class SegmentPreprocessThrottler implements PinotClusterConfigChangeListener { +public class SegmentPreprocessThrottler { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessThrottler.class); - /** - * _maxPreprocessConcurrency and _maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively disable - * throttling, this can be set to a very high value - */ - private int _maxPreprocessConcurrency; - private int _maxPreprocessConcurrencyBeforeServingQueries; - private boolean _isServingQueries; - private final AdjustableSemaphore _semaphore; - - /** - * @param maxPreprocessConcurrency configured preprocessing concurrency - * @param maxPreprocessConcurrencyBeforeServingQueries configured preprocessing concurrency before serving queries - * @param isServingQueries whether the server is ready to serve queries or not - */ - public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int maxPreprocessConcurrencyBeforeServingQueries, - boolean isServingQueries) { - LOGGER.info("Initializing SegmentPreprocessThrottler, maxPreprocessConcurrency: {}, " - + "maxPreprocessConcurrencyBeforeServingQueries: {}, isServingQueries: {}", - maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, isServingQueries); - Preconditions.checkArgument(maxPreprocessConcurrency > 0, - "Max preprocess parallelism must be > 0, but found to be: " + maxPreprocessConcurrency); - Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 0, - "Max preprocess parallelism before serving queries must be > 0, but found to be: " - + maxPreprocessConcurrencyBeforeServingQueries); - - _maxPreprocessConcurrency = maxPreprocessConcurrency; - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - _isServingQueries = isServingQueries; - - // maxConcurrentPreprocessesBeforeServingQueries is only used prior to serving queries and once the server is - // ready to serve queries this is not used again. This too is configurable via ZK CLUSTER config updates while the - // server is starting up. - int preprocessConcurrency = _maxPreprocessConcurrency; - if (!isServingQueries) { - preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries; - LOGGER.info("Serving queries is disabled, setting preprocess concurrency to: {}", preprocessConcurrency); - } - _semaphore = new AdjustableSemaphore(preprocessConcurrency, true); - LOGGER.info("Created semaphore with total permits: {}, available permits: {}", totalPermits(), - availablePermits()); - } - - public synchronized void startServingQueries() { - LOGGER.info("Serving queries is to be enabled, reset throttling threshold for segment preprocess concurrency, " - + "total permits: {}, available permits: {}", totalPermits(), availablePermits()); - _isServingQueries = true; - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Reset throttling completed, new concurrency: {}, total permits: {}, available permits: {}", - _maxPreprocessConcurrency, totalPermits(), availablePermits()); - } - - @Override - public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (CollectionUtils.isEmpty(changedConfigs)) { - LOGGER.info("Skip updating SegmentPreprocessThrottler configs with unchanged clusterConfigs"); - return; - } - - LOGGER.info("Updating SegmentPreprocessThrottler configs with latest clusterConfigs"); - handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs); - handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, clusterConfigs); - LOGGER.info("Updated SegmentPreprocessThrottler configs with latest clusterConfigs"); - } - - private void handleMaxPreprocessConcurrencyChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was not updated, skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String maxParallelSegmentPreprocessesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrency; - try { - maxPreprocessConcurrency = Integer.parseInt(maxParallelSegmentPreprocessesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making change, fix config and try again", - maxParallelSegmentPreprocessesStr); - return; - } - - if (maxPreprocessConcurrency <= 0) { - LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making change, fix config and try again", - maxPreprocessConcurrency); - return; - } - - if (maxPreprocessConcurrency == _maxPreprocessConcurrency) { - LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total permits: {}", _maxPreprocessConcurrency, - totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", _maxPreprocessConcurrency, - maxPreprocessConcurrency); - _maxPreprocessConcurrency = maxPreprocessConcurrency; - - if (!_isServingQueries) { - LOGGER.info("Serving queries hasn't been enabled yet, not updating the permits with maxPreprocessConcurrency"); - return; - } - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - - private void handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> changedConfigs, - Map<String, String> clusterConfigs) { - if (!changedConfigs.contains( - CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrencyBeforeServingQueries was not updated, " - + "skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String maxParallelSegmentPreprocessesBeforeServingQueriesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrencyBeforeServingQueries; - try { - maxPreprocessConcurrencyBeforeServingQueries = - Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: {}, not making change, fix config and " - + "try again", maxParallelSegmentPreprocessesBeforeServingQueriesStr); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries <= 0) { - LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 0, not making change, fix config " - + "and try again", maxPreprocessConcurrencyBeforeServingQueries); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries == _maxPreprocessConcurrencyBeforeServingQueries) { - LOGGER.info("No ZK update for maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}", - _maxPreprocessConcurrencyBeforeServingQueries, totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} to: {}", - _maxPreprocessConcurrencyBeforeServingQueries, maxPreprocessConcurrencyBeforeServingQueries); - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - if (!_isServingQueries) { - LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated before serving queries was enabled, " - + "updating the permits"); - _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - } - - /** - * Block trying to acquire the semaphore to perform the segment index rebuild steps unless interrupted. - * <p> - * {@link #release()} should be called after the segment preprocess completes. It is the responsibility of the caller - * to ensure that {@link #release()} is called exactly once for each call to this method. - * - * @throws InterruptedException if the current thread is interrupted - */ - public void acquire() - throws InterruptedException { - _semaphore.acquire(); - } + SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler; + SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler; Review Comment: ```suggestion private final SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler; private final SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler; ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java: ########## @@ -18,208 +18,40 @@ */ package org.apache.pinot.segment.local.utils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Map; -import java.util.Set; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.concurrency.AdjustableSemaphore; -import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; -import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Used to throttle the total concurrent index rebuilds that can happen on a given Pinot server. + * Contains all the segment preprocess throttlers used to control the total index rebuilds that can happen on a given + * Pinot server. For now this class supports index rebuild throttling at the following levels: + * - All index throttling + * - StarTree index throttling * Code paths that do no need to rebuild the index or which don't happen on the server need not utilize this throttler. */ -public class SegmentPreprocessThrottler implements PinotClusterConfigChangeListener { +public class SegmentPreprocessThrottler { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessThrottler.class); - /** - * _maxPreprocessConcurrency and _maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively disable - * throttling, this can be set to a very high value - */ - private int _maxPreprocessConcurrency; - private int _maxPreprocessConcurrencyBeforeServingQueries; - private boolean _isServingQueries; - private final AdjustableSemaphore _semaphore; - - /** - * @param maxPreprocessConcurrency configured preprocessing concurrency - * @param maxPreprocessConcurrencyBeforeServingQueries configured preprocessing concurrency before serving queries - * @param isServingQueries whether the server is ready to serve queries or not - */ - public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int maxPreprocessConcurrencyBeforeServingQueries, - boolean isServingQueries) { - LOGGER.info("Initializing SegmentPreprocessThrottler, maxPreprocessConcurrency: {}, " - + "maxPreprocessConcurrencyBeforeServingQueries: {}, isServingQueries: {}", - maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, isServingQueries); - Preconditions.checkArgument(maxPreprocessConcurrency > 0, - "Max preprocess parallelism must be > 0, but found to be: " + maxPreprocessConcurrency); - Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries > 0, - "Max preprocess parallelism before serving queries must be > 0, but found to be: " - + maxPreprocessConcurrencyBeforeServingQueries); - - _maxPreprocessConcurrency = maxPreprocessConcurrency; - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - _isServingQueries = isServingQueries; - - // maxConcurrentPreprocessesBeforeServingQueries is only used prior to serving queries and once the server is - // ready to serve queries this is not used again. This too is configurable via ZK CLUSTER config updates while the - // server is starting up. - int preprocessConcurrency = _maxPreprocessConcurrency; - if (!isServingQueries) { - preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries; - LOGGER.info("Serving queries is disabled, setting preprocess concurrency to: {}", preprocessConcurrency); - } - _semaphore = new AdjustableSemaphore(preprocessConcurrency, true); - LOGGER.info("Created semaphore with total permits: {}, available permits: {}", totalPermits(), - availablePermits()); - } - - public synchronized void startServingQueries() { - LOGGER.info("Serving queries is to be enabled, reset throttling threshold for segment preprocess concurrency, " - + "total permits: {}, available permits: {}", totalPermits(), availablePermits()); - _isServingQueries = true; - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Reset throttling completed, new concurrency: {}, total permits: {}, available permits: {}", - _maxPreprocessConcurrency, totalPermits(), availablePermits()); - } - - @Override - public synchronized void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (CollectionUtils.isEmpty(changedConfigs)) { - LOGGER.info("Skip updating SegmentPreprocessThrottler configs with unchanged clusterConfigs"); - return; - } - - LOGGER.info("Updating SegmentPreprocessThrottler configs with latest clusterConfigs"); - handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs); - handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs, clusterConfigs); - LOGGER.info("Updated SegmentPreprocessThrottler configs with latest clusterConfigs"); - } - - private void handleMaxPreprocessConcurrencyChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { - if (!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was not updated, skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM; - String maxParallelSegmentPreprocessesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrency; - try { - maxPreprocessConcurrency = Integer.parseInt(maxParallelSegmentPreprocessesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making change, fix config and try again", - maxParallelSegmentPreprocessesStr); - return; - } - - if (maxPreprocessConcurrency <= 0) { - LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making change, fix config and try again", - maxPreprocessConcurrency); - return; - } - - if (maxPreprocessConcurrency == _maxPreprocessConcurrency) { - LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total permits: {}", _maxPreprocessConcurrency, - totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}", _maxPreprocessConcurrency, - maxPreprocessConcurrency); - _maxPreprocessConcurrency = maxPreprocessConcurrency; - - if (!_isServingQueries) { - LOGGER.info("Serving queries hasn't been enabled yet, not updating the permits with maxPreprocessConcurrency"); - return; - } - _semaphore.setPermits(_maxPreprocessConcurrency); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - - private void handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String> changedConfigs, - Map<String, String> clusterConfigs) { - if (!changedConfigs.contains( - CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)) { - LOGGER.info("changedConfigs list indicates maxPreprocessConcurrencyBeforeServingQueries was not updated, " - + "skipping updates"); - return; - } - - String configName = CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String defaultConfigValue = CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES; - String maxParallelSegmentPreprocessesBeforeServingQueriesStr = - clusterConfigs == null ? defaultConfigValue : clusterConfigs.getOrDefault(configName, defaultConfigValue); - - int maxPreprocessConcurrencyBeforeServingQueries; - try { - maxPreprocessConcurrencyBeforeServingQueries = - Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr); - } catch (Exception e) { - LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set: {}, not making change, fix config and " - + "try again", maxParallelSegmentPreprocessesBeforeServingQueriesStr); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries <= 0) { - LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be > 0, not making change, fix config " - + "and try again", maxPreprocessConcurrencyBeforeServingQueries); - return; - } - - if (maxPreprocessConcurrencyBeforeServingQueries == _maxPreprocessConcurrencyBeforeServingQueries) { - LOGGER.info("No ZK update for maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}", - _maxPreprocessConcurrencyBeforeServingQueries, totalPermits()); - return; - } - - LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {} to: {}", - _maxPreprocessConcurrencyBeforeServingQueries, maxPreprocessConcurrencyBeforeServingQueries); - _maxPreprocessConcurrencyBeforeServingQueries = maxPreprocessConcurrencyBeforeServingQueries; - if (!_isServingQueries) { - LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated before serving queries was enabled, " - + "updating the permits"); - _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries); - LOGGER.info("Updated total permits: {}", totalPermits()); - } - } - - /** - * Block trying to acquire the semaphore to perform the segment index rebuild steps unless interrupted. - * <p> - * {@link #release()} should be called after the segment preprocess completes. It is the responsibility of the caller - * to ensure that {@link #release()} is called exactly once for each call to this method. - * - * @throws InterruptedException if the current thread is interrupted - */ - public void acquire() - throws InterruptedException { - _semaphore.acquire(); - } + SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler; + SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler; /** - * Should be called after the segment index build completes. It is the responsibility of the caller to ensure that - * this method is called exactly once for each call to {@link #acquire()}. + * Constructor for SegmentPreprocessThrottler + * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to use for all indexes + * @param segmentStarTreePreprocessThrottler segment preprocess throttler to use for StarTree index */ - public void release() { - _semaphore.release(); + public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler, + SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) { + LOGGER.info("Initializing SegmentPreprocessThrottler"); Review Comment: This info is not very useful. I assume we are logging details within the individual throttlers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org