This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 763c732d4f Make SegmentOperationsThrottler more extensible and modify interfaces for upsert and dedup to take this as an argument (#15973) 763c732d4f is described below commit 763c732d4fed14c5429ef99785ee47e0b0815319 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Tue Jun 10 10:50:36 2025 -0700 Make SegmentOperationsThrottler more extensible and modify interfaces for upsert and dedup to take this as an argument (#15973) --- .../manager/realtime/RealtimeTableDataManager.java | 4 +- .../local/dedup/BaseTableDedupMetadataManager.java | 6 +- .../local/dedup/TableDedupMetadataManager.java | 4 +- .../dedup/TableDedupMetadataManagerFactory.java | 7 +- .../upsert/BaseTableUpsertMetadataManager.java | 6 +- .../local/upsert/TableUpsertMetadataManager.java | 4 +- .../upsert/TableUpsertMetadataManagerFactory.java | 5 +- .../utils/BaseSegmentOperationsThrottler.java | 47 ++++++++----- .../utils/SegmentAllIndexPreprocessThrottler.java | 13 +++- .../local/utils/SegmentDownloadThrottler.java | 17 ++--- .../utils/SegmentStarTreePreprocessThrottler.java | 11 ++- .../TableDedupMetadataManagerFactoryTest.java | 2 +- .../mutable/MutableSegmentDedupTest.java | 3 +- .../MutableSegmentImplUpsertComparisonColTest.java | 3 +- .../mutable/MutableSegmentImplUpsertTest.java | 2 +- .../TableUpsertMetadataManagerFactoryTest.java | 14 ++-- .../server/starter/helix/BaseServerStarter.java | 79 +++++++++++++--------- .../apache/pinot/spi/utils/CommonConstants.java | 3 + 18 files changed, 149 insertions(+), 81 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 15ebdc56eb..004755f47c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -207,14 +207,14 @@ public class RealtimeTableDataManager extends BaseTableDataManager { if (tableConfig.isDedupEnabled()) { _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(), tableConfig, schema, - this); + this, _segmentOperationsThrottler); } if (tableConfig.isUpsertEnabled()) { Preconditions.checkState(_tableDedupMetadataManager == null, "Dedup and upsert cannot be both enabled for table: %s", _tableNameWithType); _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(), tableConfig, schema, - this); + this, _segmentOperationsThrottler); } _enforceConsumptionInOrder = isEnforceConsumptionInOrder(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java index e2c8f986e2..61988fd4ad 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -40,11 +42,13 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat protected final Map<Integer, PartitionDedupMetadataManager> _partitionMetadataManagerMap = new ConcurrentHashMap<>(); protected String _tableNameWithType; protected DedupContext _context; + protected SegmentOperationsThrottler _segmentOperationsThrottler; @Override public void init(PinotConfiguration instanceDedupConfig, TableConfig tableConfig, Schema schema, - TableDataManager tableDataManager) { + TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler segmentOperationsThrottler) { _tableNameWithType = tableConfig.getTableName(); + _segmentOperationsThrottler = segmentOperationsThrottler; Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be enabled for table: %s", _tableNameWithType); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java index edc971bea2..03007c9034 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java @@ -19,7 +19,9 @@ package org.apache.pinot.segment.local.dedup; import java.io.Closeable; +import javax.annotation.Nullable; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -30,7 +32,7 @@ public interface TableDedupMetadataManager extends Closeable { * Initialize TableDedupMetadataManager. */ void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, Schema schema, - TableDataManager tableDataManager); + TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); /** * Create a new PartitionDedupMetadataManager if not present already, otherwise return existing one. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java index 473da5e392..ae34d71464 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java @@ -19,8 +19,10 @@ package org.apache.pinot.segment.local.dedup; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -37,7 +39,8 @@ public class TableDedupMetadataManagerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(TableDedupMetadataManagerFactory.class); public static TableDedupMetadataManager create(PinotConfiguration instanceDedupConfig, TableConfig tableConfig, - Schema schema, TableDataManager tableDataManager) { + Schema schema, TableDataManager tableDataManager, + @Nullable SegmentOperationsThrottler segmentOperationsThrottler) { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be enabled for table: %s", tableNameWithType); DedupConfig dedupConfig = tableConfig.getDedupConfig(); @@ -63,7 +66,7 @@ public class TableDedupMetadataManagerFactory { LOGGER.info("Creating ConcurrentMapTableDedupMetadataManager for table: {}", tableNameWithType); metadataManager = new ConcurrentMapTableDedupMetadataManager(); } - metadataManager.init(instanceDedupConfig, tableConfig, schema, tableDataManager); + metadataManager.init(instanceDedupConfig, tableConfig, schema, tableDataManager, segmentOperationsThrottler); return metadataManager; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 8d2318148e..539ae1a95c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -20,9 +20,11 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.base.Preconditions; import java.util.List; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; @@ -38,11 +40,13 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad protected String _tableNameWithType; protected UpsertContext _context; + protected SegmentOperationsThrottler _segmentOperationsThrottler; @Override public void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, Schema schema, - TableDataManager tableDataManager) { + TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler segmentOperationsThrottler) { _tableNameWithType = tableConfig.getTableName(); + _segmentOperationsThrottler = segmentOperationsThrottler; Preconditions.checkArgument(tableConfig.isUpsertEnabled(), "Upsert must be enabled for table: %s", _tableNameWithType); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java index e4071fa08c..3a8012c539 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java @@ -23,8 +23,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -39,7 +41,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; public interface TableUpsertMetadataManager extends Closeable { void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, Schema schema, - TableDataManager tableDataManager); + TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java index 8d81b92588..1151220098 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java @@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; @@ -37,7 +38,7 @@ public class TableUpsertMetadataManagerFactory { private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class); public static TableUpsertMetadataManager create(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, - Schema schema, TableDataManager tableDataManager) { + Schema schema, TableDataManager tableDataManager, SegmentOperationsThrottler segmentOperationsThrottler) { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkArgument(tableConfig.isUpsertEnabled(), "Upsert must be enabled for table: %s", tableNameWithType); @@ -64,7 +65,7 @@ public class TableUpsertMetadataManagerFactory { LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table: {}", tableNameWithType); metadataManager = new ConcurrentMapTableUpsertMetadataManager(); } - metadataManager.init(instanceUpsertConfig, tableConfig, schema, tableDataManager); + metadataManager.init(instanceUpsertConfig, tableConfig, schema, tableDataManager, segmentOperationsThrottler); return metadataManager; } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java index 4e8de0cdf8..717e19aab8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.concurrency.AdjustableSemaphore; -import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; import org.slf4j.Logger; @@ -45,8 +44,6 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf protected int _maxConcurrency; protected int _maxConcurrencyBeforeServingQueries; protected boolean _isServingQueries; - protected ServerGauge _thresholdGauge; - protected ServerGauge _countGauge; private AtomicInteger _numSegmentsAcquiredSemaphore; private final Logger _logger; @@ -55,12 +52,10 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf * @param maxConcurrency configured concurrency * @param maxConcurrencyBeforeServingQueries configured concurrency before serving queries * @param isServingQueries whether the server is ready to serve queries or not - * @param thresholdGauge gauge metric to track the throttle thresholds - * @param countGauge gauge metric to track the number of segments undergoing the given operation * @param logger logger to use */ public BaseSegmentOperationsThrottler(int maxConcurrency, int maxConcurrencyBeforeServingQueries, - boolean isServingQueries, ServerGauge thresholdGauge, ServerGauge countGauge, Logger logger) { + boolean isServingQueries, Logger logger) { _logger = logger; _logger.info("Initializing SegmentOperationsThrottler, maxConcurrency: {}, maxConcurrencyBeforeServingQueries: {}, " + "isServingQueries: {}", @@ -72,8 +67,6 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf _maxConcurrency = maxConcurrency; _maxConcurrencyBeforeServingQueries = maxConcurrencyBeforeServingQueries; _isServingQueries = isServingQueries; - _thresholdGauge = thresholdGauge; - _countGauge = countGauge; // maxConcurrencyBeforeServingQueries is only used prior to serving queries and once the server is // ready to serve queries this is not used again. This too is configurable via ZK CLUSTER config updates while the @@ -90,6 +83,18 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf availablePermits()); } + /** + * Updates the throttle threshold metric + * @param value value to update the metric to + */ + public abstract void updateThresholdMetric(int value); + + /** + * Updates the throttle count metric + * @param value value to update the metric to + */ + public abstract void updateCountMetric(int value); + /** * The ServerMetrics may be created after these throttle objects are created. In that case, the initialization that * happens in the constructor may have occurred on the NOOP metrics. This should be called after the server metrics @@ -99,8 +104,8 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf */ public void initializeMetrics() { _serverMetrics = ServerMetrics.get(); - _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _semaphore.getTotalPermits()); - _serverMetrics.setValueOfGlobalGauge(_countGauge, 0); + updateThresholdMetric(_semaphore.getTotalPermits()); + updateCountMetric(0); } public synchronized void startServingQueries() { @@ -108,7 +113,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf + "total permits: {}, available permits: {}", totalPermits(), availablePermits()); _isServingQueries = true; _semaphore.setPermits(_maxConcurrency); - _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency); + updateThresholdMetric(_maxConcurrency); _logger.info("Reset throttling completed, new concurrency: {}, total permits: {}, available permits: {}", _maxConcurrency, totalPermits(), availablePermits()); } @@ -152,7 +157,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf return; } _semaphore.setPermits(_maxConcurrency); - _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency); + updateThresholdMetric(_maxConcurrency); _logger.info("Updated total permits: {}", totalPermits()); } @@ -193,7 +198,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf if (!_isServingQueries) { _logger.info("config: {} was updated before serving queries was enabled, updating the permits", configName); _semaphore.setPermits(_maxConcurrencyBeforeServingQueries); - _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrencyBeforeServingQueries); + updateThresholdMetric(_maxConcurrencyBeforeServingQueries); _logger.info("Updated total permits: {}", totalPermits()); } } @@ -209,7 +214,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf public void acquire() throws InterruptedException { _semaphore.acquire(); - _serverMetrics.setValueOfGlobalGauge(_countGauge, _numSegmentsAcquiredSemaphore.incrementAndGet()); + updateCountMetric(_numSegmentsAcquiredSemaphore.incrementAndGet()); } /** @@ -218,7 +223,15 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf */ public void release() { _semaphore.release(); - _serverMetrics.setValueOfGlobalGauge(_countGauge, _numSegmentsAcquiredSemaphore.decrementAndGet()); + updateCountMetric(_numSegmentsAcquiredSemaphore.decrementAndGet()); + } + + /** + * Get the estimated number of threads waiting for the semaphore + * @return the estimated queue length + */ + public int getQueueLength() { + return _semaphore.getQueueLength(); } /** @@ -226,7 +239,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf * @return number of available permits */ @VisibleForTesting - protected int availablePermits() { + public int availablePermits() { return _semaphore.availablePermits(); } @@ -235,7 +248,7 @@ public abstract class BaseSegmentOperationsThrottler implements PinotClusterConf * @return total number of permits */ @VisibleForTesting - protected int totalPermits() { + public int totalPermits() { return _semaphore.getTotalPermits(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java index 8a9a5251eb..6243ba5d44 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java @@ -40,8 +40,7 @@ public class SegmentAllIndexPreprocessThrottler extends BaseSegmentOperationsThr */ public SegmentAllIndexPreprocessThrottler(int maxPreprocessConcurrency, int maxPreprocessConcurrencyBeforeServingQueries, boolean isServingQueries) { - super(maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, isServingQueries, - ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD, ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT, LOGGER); + super(maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, isServingQueries, LOGGER); } @Override @@ -60,4 +59,14 @@ public class SegmentAllIndexPreprocessThrottler extends BaseSegmentOperationsThr CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES); LOGGER.info("Updated SegmentAllIndexPreprocessThrottler configs with latest clusterConfigs"); } + + @Override + public void updateThresholdMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD, value); + } + + @Override + public void updateCountMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT, value); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java index 1aec9fbaf7..906c58d013 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java @@ -49,8 +49,7 @@ public class SegmentDownloadThrottler extends BaseSegmentOperationsThrottler { */ public SegmentDownloadThrottler(int maxDownloadConcurrency, int maxDownloadConcurrencyBeforeServingQueries, boolean isServingQueries) { - super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, isServingQueries, - ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD, ServerGauge.SEGMENT_DOWNLOAD_COUNT, LOGGER); + super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, isServingQueries, LOGGER); } @Override @@ -70,11 +69,13 @@ public class SegmentDownloadThrottler extends BaseSegmentOperationsThrottler { LOGGER.info("Updated SegmentDownloadThrottler configs with latest clusterConfigs"); } - /** - * Get the estimated number of threads waiting for the semaphore - * @return the estimated queue length - */ - public int getQueueLength() { - return _semaphore.getQueueLength(); + @Override + public void updateThresholdMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD, value); + } + + @Override + public void updateCountMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_COUNT, value); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java index 531e90b44f..cf3e6216d3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java @@ -42,7 +42,6 @@ public class SegmentStarTreePreprocessThrottler extends BaseSegmentOperationsThr public SegmentStarTreePreprocessThrottler(int maxStarTreePreprocessConcurrency, int maxStarTreePreprocessConcurrencyBeforeServingQueries, boolean isServingQueries) { super(maxStarTreePreprocessConcurrency, maxStarTreePreprocessConcurrencyBeforeServingQueries, isServingQueries, - ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD, ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT, LOGGER); } @@ -62,4 +61,14 @@ public class SegmentStarTreePreprocessThrottler extends BaseSegmentOperationsThr CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES); LOGGER.info("Updated SegmentStarTreePreprocessThrottler configs with latest clusterConfigs"); } + + @Override + public void updateThresholdMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD, value); + } + + @Override + public void updateCountMetric(int value) { + _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT, value); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java index 7c2f626ff7..02e82df143 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java @@ -86,7 +86,7 @@ public class TableDedupMetadataManagerFactoryTest { TableDataManager tableDataManager, boolean expected) throws IOException { try (TableDedupMetadataManager tableDedupMetadataManager = TableDedupMetadataManagerFactory.create( - instanceDedupConfig, tableConfig, schema, tableDataManager)) { + instanceDedupConfig, tableConfig, schema, tableDataManager, null)) { assertEquals(tableDedupMetadataManager.getContext().isPreloadEnabled(), expected); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java index 34d49ec1f2..a3ca692e0a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java @@ -119,7 +119,8 @@ public class MutableSegmentDedupTest implements PinotBuffersAfterMethodCheckRule .build(); TableDataManager tableDataManager = mock(TableDataManager.class); when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR); - return TableDedupMetadataManagerFactory.create(new PinotConfiguration(), tableConfig, schema, tableDataManager); + return TableDedupMetadataManagerFactory.create(new PinotConfiguration(), tableConfig, schema, tableDataManager, + null); } public List<Map<String, String>> loadJsonFile(String filePath) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index 378afbb6cd..1588de8f91 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -87,7 +87,8 @@ public class MutableSegmentImplUpsertComparisonColTest implements PinotBuffersAf _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); TableUpsertMetadataManager tableUpsertMetadataManager = - TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), _tableConfig, _schema, _tableDataManager); + TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), _tableConfig, _schema, _tableDataManager, + null); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, true, "secondsSinceEpoch", _partitionUpsertMetadataManager, null); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index a2ac9d6c09..b72ef4679b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -82,7 +82,7 @@ public class MutableSegmentImplUpsertTest { File jsonFile = new File(dataResourceUrl.getFile()); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), tableConfig, schema, - mock(TableDataManager.class)); + mock(TableDataManager.class), null); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); _mutableSegmentImpl = MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, true, TIME_COLUMN, _partitionUpsertMetadataManager, diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java index eb7c77ade9..4dfb248502 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java @@ -65,11 +65,11 @@ public class TableUpsertMetadataManagerFactoryTest { when(tableDataManager.getTableDataDir()).thenReturn(new File(RAW_TABLE_NAME)); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), createTableConfig(upsertConfig), SCHEMA, - tableDataManager); + tableDataManager, null); assertNotNull(tableUpsertMetadataManager); assertTrue(tableUpsertMetadataManager instanceof ConcurrentMapTableUpsertMetadataManager); - assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager( - 0) instanceof ConcurrentMapPartitionUpsertMetadataManager); + assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0) + instanceof ConcurrentMapPartitionUpsertMetadataManager); } @Test @@ -82,11 +82,11 @@ public class TableUpsertMetadataManagerFactoryTest { when(tableDataManager.getTableDataDir()).thenReturn(new File(RAW_TABLE_NAME)); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), createTableConfig(upsertConfig), SCHEMA, - tableDataManager); + tableDataManager, null); assertNotNull(tableUpsertMetadataManager); assertTrue(tableUpsertMetadataManager instanceof ConcurrentMapTableUpsertMetadataManager); - assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager( - 0) instanceof ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes); + assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0) + instanceof ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes); } @SuppressWarnings("deprecation") @@ -139,7 +139,7 @@ public class TableUpsertMetadataManagerFactoryTest { TableDataManager tableDataManager, boolean expected) throws IOException { try (TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create( - instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager)) { + instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager, null)) { assertEquals(tableUpsertMetadataManager.getContext().isPreloadEnabled(), expected); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index a40414d25c..76d4a3a39b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -644,38 +644,17 @@ public abstract class BaseServerStarter implements ServiceStartable { ServerSegmentCompletionProtocolHandler.init( _serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER)); - int maxPreprocessConcurrency = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, - Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM)); - int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES, - Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)); - // Relax throttling until the server is ready to serve queries - SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler = - new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries, - false); - int maxStarTreePreprocessConcurrency = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, - Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM)); - int maxStarTreePreprocessConcurrencyBeforeServingQueries = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES, - Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)); - // Relax throttling until the server is ready to serve queries - SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler = - new SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency, - maxStarTreePreprocessConcurrencyBeforeServingQueries, false); - int maxDownloadConcurrency = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM, - Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM)); - int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt( - _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES)); - // Relax throttling until the server is ready to serve queries - SegmentDownloadThrottler segmentDownloadThrottler = - new SegmentDownloadThrottler(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, false); - _segmentOperationsThrottler = - new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler, - segmentDownloadThrottler); + if (_segmentOperationsThrottler == null) { + // Only create segment operation throttlers if null + SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler = + createSegmentAllIndexPreprocessThrottler(); + SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler = + createSegmentStarTreePreprocessThrottler(); + SegmentDownloadThrottler segmentDownloadThrottler = createSegmentDownloadThrottler(); + _segmentOperationsThrottler = + new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler, + segmentDownloadThrottler); + } SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf, _helixManager); ServerConf serverConf = new ServerConf(_serverConf); @@ -829,6 +808,42 @@ public abstract class BaseServerStarter implements ServiceStartable { } } + protected SegmentAllIndexPreprocessThrottler createSegmentAllIndexPreprocessThrottler() { + int maxPreprocessConcurrency = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, + Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM)); + int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES, + Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)); + // Relax throttling until the server is ready to serve queries + return new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency, + maxPreprocessConcurrencyBeforeServingQueries, false); + } + + protected SegmentStarTreePreprocessThrottler createSegmentStarTreePreprocessThrottler() { + int maxStarTreePreprocessConcurrency = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, + Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM)); + int maxStarTreePreprocessConcurrencyBeforeServingQueries = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES, + Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)); + // Relax throttling until the server is ready to serve queries + return new SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency, + maxStarTreePreprocessConcurrencyBeforeServingQueries, false); + } + + protected SegmentDownloadThrottler createSegmentDownloadThrottler() { + int maxDownloadConcurrency = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM, + Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM)); + int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt( + _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, + Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES)); + // Relax throttling until the server is ready to serve queries + return new SegmentDownloadThrottler(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, + false); + } + /** * Can be overridden to perform operations before server starts serving queries. */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 86b8200dfc..137eaa9149 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -268,6 +268,7 @@ public class CommonConstants { // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(Integer.MAX_VALUE); + // Preprocess throttle config specifically for StarTree index rebuild public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = "pinot.server.max.segment.startree.preprocess.parallelism"; @@ -278,6 +279,8 @@ public class CommonConstants { // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(Integer.MAX_VALUE); + + // Download throttle config public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "pinot.server.max.segment.download.parallelism"; // Setting to Integer.MAX_VALUE to effectively disable throttling by default --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org