This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 763c732d4f Make SegmentOperationsThrottler more extensible and modify 
interfaces for upsert and dedup to take this as an argument (#15973)
763c732d4f is described below

commit 763c732d4fed14c5429ef99785ee47e0b0815319
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Tue Jun 10 10:50:36 2025 -0700

    Make SegmentOperationsThrottler more extensible and modify interfaces for 
upsert and dedup to take this as an argument (#15973)
---
 .../manager/realtime/RealtimeTableDataManager.java |  4 +-
 .../local/dedup/BaseTableDedupMetadataManager.java |  6 +-
 .../local/dedup/TableDedupMetadataManager.java     |  4 +-
 .../dedup/TableDedupMetadataManagerFactory.java    |  7 +-
 .../upsert/BaseTableUpsertMetadataManager.java     |  6 +-
 .../local/upsert/TableUpsertMetadataManager.java   |  4 +-
 .../upsert/TableUpsertMetadataManagerFactory.java  |  5 +-
 .../utils/BaseSegmentOperationsThrottler.java      | 47 ++++++++-----
 .../utils/SegmentAllIndexPreprocessThrottler.java  | 13 +++-
 .../local/utils/SegmentDownloadThrottler.java      | 17 ++---
 .../utils/SegmentStarTreePreprocessThrottler.java  | 11 ++-
 .../TableDedupMetadataManagerFactoryTest.java      |  2 +-
 .../mutable/MutableSegmentDedupTest.java           |  3 +-
 .../MutableSegmentImplUpsertComparisonColTest.java |  3 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |  2 +-
 .../TableUpsertMetadataManagerFactoryTest.java     | 14 ++--
 .../server/starter/helix/BaseServerStarter.java    | 79 +++++++++++++---------
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 +
 18 files changed, 149 insertions(+), 81 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 15ebdc56eb..004755f47c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,14 +207,14 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     if (tableConfig.isDedupEnabled()) {
       _tableDedupMetadataManager =
           
TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(),
 tableConfig, schema,
-              this);
+              this, _segmentOperationsThrottler);
     }
     if (tableConfig.isUpsertEnabled()) {
       Preconditions.checkState(_tableDedupMetadataManager == null,
           "Dedup and upsert cannot be both enabled for table: %s", 
_tableNameWithType);
       _tableUpsertMetadataManager =
           
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
 tableConfig, schema,
-              this);
+              this, _segmentOperationsThrottler);
     }
 
     _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index e2c8f986e2..61988fd4ad 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -40,11 +42,13 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
   protected final Map<Integer, PartitionDedupMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
   protected String _tableNameWithType;
   protected DedupContext _context;
+  protected SegmentOperationsThrottler _segmentOperationsThrottler;
 
   @Override
   public void init(PinotConfiguration instanceDedupConfig, TableConfig 
tableConfig, Schema schema,
-      TableDataManager tableDataManager) {
+      TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler 
segmentOperationsThrottler) {
     _tableNameWithType = tableConfig.getTableName();
+    _segmentOperationsThrottler = segmentOperationsThrottler;
 
     Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be 
enabled for table: %s",
         _tableNameWithType);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index edc971bea2..03007c9034 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.segment.local.dedup;
 
 import java.io.Closeable;
+import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -30,7 +32,7 @@ public interface TableDedupMetadataManager extends Closeable {
    * Initialize TableDedupMetadataManager.
    */
   void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, 
Schema schema,
-      TableDataManager tableDataManager);
+      TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler 
segmentOperationsThrottler);
 
   /**
    * Create a new PartitionDedupMetadataManager if not present already, 
otherwise return existing one.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
index 473da5e392..ae34d71464 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -37,7 +39,8 @@ public class TableDedupMetadataManagerFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableDedupMetadataManagerFactory.class);
 
   public static TableDedupMetadataManager create(PinotConfiguration 
instanceDedupConfig, TableConfig tableConfig,
-      Schema schema, TableDataManager tableDataManager) {
+      Schema schema, TableDataManager tableDataManager,
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
     String tableNameWithType = tableConfig.getTableName();
     Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be 
enabled for table: %s", tableNameWithType);
     DedupConfig dedupConfig = tableConfig.getDedupConfig();
@@ -63,7 +66,7 @@ public class TableDedupMetadataManagerFactory {
       LOGGER.info("Creating ConcurrentMapTableDedupMetadataManager for table: 
{}", tableNameWithType);
       metadataManager = new ConcurrentMapTableDedupMetadataManager();
     }
-    metadataManager.init(instanceDedupConfig, tableConfig, schema, 
tableDataManager);
+    metadataManager.init(instanceDedupConfig, tableConfig, schema, 
tableDataManager, segmentOperationsThrottler);
     return metadataManager;
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 8d2318148e..539ae1a95c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -20,9 +20,11 @@ package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -38,11 +40,13 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
   protected String _tableNameWithType;
   protected UpsertContext _context;
+  protected SegmentOperationsThrottler _segmentOperationsThrottler;
 
   @Override
   public void init(PinotConfiguration instanceUpsertConfig, TableConfig 
tableConfig, Schema schema,
-      TableDataManager tableDataManager) {
+      TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler 
segmentOperationsThrottler) {
     _tableNameWithType = tableConfig.getTableName();
+    _segmentOperationsThrottler = segmentOperationsThrottler;
 
     Preconditions.checkArgument(tableConfig.isUpsertEnabled(),
         "Upsert must be enabled for table: %s", _tableNameWithType);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index e4071fa08c..3a8012c539 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,8 +23,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -39,7 +41,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 public interface TableUpsertMetadataManager extends Closeable {
 
   void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig, 
Schema schema,
-      TableDataManager tableDataManager);
+      TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler 
segmentOperationsThrottler);
 
   PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 8d81b92588..1151220098 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.upsert;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -37,7 +38,7 @@ public class TableUpsertMetadataManagerFactory {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
   public static TableUpsertMetadataManager create(PinotConfiguration 
instanceUpsertConfig, TableConfig tableConfig,
-      Schema schema, TableDataManager tableDataManager) {
+      Schema schema, TableDataManager tableDataManager, 
SegmentOperationsThrottler segmentOperationsThrottler) {
     String tableNameWithType = tableConfig.getTableName();
     Preconditions.checkArgument(tableConfig.isUpsertEnabled(), "Upsert must be 
enabled for table: %s",
         tableNameWithType);
@@ -64,7 +65,7 @@ public class TableUpsertMetadataManagerFactory {
       LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table: 
{}", tableNameWithType);
       metadataManager = new ConcurrentMapTableUpsertMetadataManager();
     }
-    metadataManager.init(instanceUpsertConfig, tableConfig, schema, 
tableDataManager);
+    metadataManager.init(instanceUpsertConfig, tableConfig, schema, 
tableDataManager, segmentOperationsThrottler);
     return metadataManager;
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
index 4e8de0cdf8..717e19aab8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.slf4j.Logger;
@@ -45,8 +44,6 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
   protected int _maxConcurrency;
   protected int _maxConcurrencyBeforeServingQueries;
   protected boolean _isServingQueries;
-  protected ServerGauge _thresholdGauge;
-  protected ServerGauge _countGauge;
   private AtomicInteger _numSegmentsAcquiredSemaphore;
   private final Logger _logger;
 
@@ -55,12 +52,10 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
    * @param maxConcurrency configured concurrency
    * @param maxConcurrencyBeforeServingQueries configured concurrency before 
serving queries
    * @param isServingQueries whether the server is ready to serve queries or 
not
-   * @param thresholdGauge gauge metric to track the throttle thresholds
-   * @param countGauge gauge metric to track the number of segments undergoing 
the given operation
    * @param logger logger to use
    */
   public BaseSegmentOperationsThrottler(int maxConcurrency, int 
maxConcurrencyBeforeServingQueries,
-      boolean isServingQueries, ServerGauge thresholdGauge, ServerGauge 
countGauge, Logger logger) {
+      boolean isServingQueries, Logger logger) {
     _logger = logger;
     _logger.info("Initializing SegmentOperationsThrottler, maxConcurrency: {}, 
maxConcurrencyBeforeServingQueries: {}, "
             + "isServingQueries: {}",
@@ -72,8 +67,6 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
     _maxConcurrency = maxConcurrency;
     _maxConcurrencyBeforeServingQueries = maxConcurrencyBeforeServingQueries;
     _isServingQueries = isServingQueries;
-    _thresholdGauge = thresholdGauge;
-    _countGauge = countGauge;
 
     // maxConcurrencyBeforeServingQueries is only used prior to serving 
queries and once the server is
     // ready to serve queries this is not used again. This too is configurable 
via ZK CLUSTER config updates while the
@@ -90,6 +83,18 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
         availablePermits());
   }
 
+  /**
+   * Updates the throttle threshold metric
+   * @param value value to update the metric to
+   */
+  public abstract void updateThresholdMetric(int value);
+
+  /**
+   * Updates the throttle count metric
+   * @param value value to update the metric to
+   */
+  public abstract void updateCountMetric(int value);
+
   /**
    * The ServerMetrics may be created after these throttle objects are 
created. In that case, the initialization that
    * happens in the constructor may have occurred on the NOOP metrics. This 
should be called after the server metrics
@@ -99,8 +104,8 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
    */
   public void initializeMetrics() {
     _serverMetrics = ServerMetrics.get();
-    _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, 
_semaphore.getTotalPermits());
-    _serverMetrics.setValueOfGlobalGauge(_countGauge, 0);
+    updateThresholdMetric(_semaphore.getTotalPermits());
+    updateCountMetric(0);
   }
 
   public synchronized void startServingQueries() {
@@ -108,7 +113,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
         + "total permits: {}, available permits: {}", totalPermits(), 
availablePermits());
     _isServingQueries = true;
     _semaphore.setPermits(_maxConcurrency);
-    _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency);
+    updateThresholdMetric(_maxConcurrency);
     _logger.info("Reset throttling completed, new concurrency: {}, total 
permits: {}, available permits: {}",
         _maxConcurrency, totalPermits(), availablePermits());
   }
@@ -152,7 +157,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
       return;
     }
     _semaphore.setPermits(_maxConcurrency);
-    _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency);
+    updateThresholdMetric(_maxConcurrency);
     _logger.info("Updated total permits: {}", totalPermits());
   }
 
@@ -193,7 +198,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
     if (!_isServingQueries) {
       _logger.info("config: {} was updated before serving queries was enabled, 
updating the permits", configName);
       _semaphore.setPermits(_maxConcurrencyBeforeServingQueries);
-      _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, 
_maxConcurrencyBeforeServingQueries);
+      updateThresholdMetric(_maxConcurrencyBeforeServingQueries);
       _logger.info("Updated total permits: {}", totalPermits());
     }
   }
@@ -209,7 +214,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
   public void acquire()
       throws InterruptedException {
     _semaphore.acquire();
-    _serverMetrics.setValueOfGlobalGauge(_countGauge, 
_numSegmentsAcquiredSemaphore.incrementAndGet());
+    updateCountMetric(_numSegmentsAcquiredSemaphore.incrementAndGet());
   }
 
   /**
@@ -218,7 +223,15 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
    */
   public void release() {
     _semaphore.release();
-    _serverMetrics.setValueOfGlobalGauge(_countGauge, 
_numSegmentsAcquiredSemaphore.decrementAndGet());
+    updateCountMetric(_numSegmentsAcquiredSemaphore.decrementAndGet());
+  }
+
+  /**
+   * Get the estimated number of threads waiting for the semaphore
+   * @return the estimated queue length
+   */
+  public int getQueueLength() {
+    return _semaphore.getQueueLength();
   }
 
   /**
@@ -226,7 +239,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
    * @return number of available permits
    */
   @VisibleForTesting
-  protected int availablePermits() {
+  public int availablePermits() {
     return _semaphore.availablePermits();
   }
 
@@ -235,7 +248,7 @@ public abstract class BaseSegmentOperationsThrottler 
implements PinotClusterConf
    * @return total number of permits
    */
   @VisibleForTesting
-  protected int totalPermits() {
+  public int totalPermits() {
     return _semaphore.getTotalPermits();
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
index 8a9a5251eb..6243ba5d44 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
@@ -40,8 +40,7 @@ public class SegmentAllIndexPreprocessThrottler extends 
BaseSegmentOperationsThr
    */
   public SegmentAllIndexPreprocessThrottler(int maxPreprocessConcurrency,
       int maxPreprocessConcurrencyBeforeServingQueries, boolean 
isServingQueries) {
-    super(maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries,
-        ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD, 
ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT, LOGGER);
+    super(maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries, LOGGER);
   }
 
   @Override
@@ -60,4 +59,14 @@ public class SegmentAllIndexPreprocessThrottler extends 
BaseSegmentOperationsThr
         
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES);
     LOGGER.info("Updated SegmentAllIndexPreprocessThrottler configs with 
latest clusterConfigs");
   }
+
+  @Override
+  public void updateThresholdMetric(int value) {
+    
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD,
 value);
+  }
+
+  @Override
+  public void updateCountMetric(int value) {
+    
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT, 
value);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
index 1aec9fbaf7..906c58d013 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
@@ -49,8 +49,7 @@ public class SegmentDownloadThrottler extends 
BaseSegmentOperationsThrottler {
    */
   public SegmentDownloadThrottler(int maxDownloadConcurrency, int 
maxDownloadConcurrencyBeforeServingQueries,
       boolean isServingQueries) {
-    super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, 
isServingQueries,
-        ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD, 
ServerGauge.SEGMENT_DOWNLOAD_COUNT, LOGGER);
+    super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, 
isServingQueries, LOGGER);
   }
 
   @Override
@@ -70,11 +69,13 @@ public class SegmentDownloadThrottler extends 
BaseSegmentOperationsThrottler {
     LOGGER.info("Updated SegmentDownloadThrottler configs with latest 
clusterConfigs");
   }
 
-  /**
-   * Get the estimated number of threads waiting for the semaphore
-   * @return the estimated queue length
-   */
-  public int getQueueLength() {
-    return _semaphore.getQueueLength();
+  @Override
+  public void updateThresholdMetric(int value) {
+    
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD,
 value);
+  }
+
+  @Override
+  public void updateCountMetric(int value) {
+    _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_COUNT, 
value);
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
index 531e90b44f..cf3e6216d3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
@@ -42,7 +42,6 @@ public class SegmentStarTreePreprocessThrottler extends 
BaseSegmentOperationsThr
   public SegmentStarTreePreprocessThrottler(int 
maxStarTreePreprocessConcurrency,
       int maxStarTreePreprocessConcurrencyBeforeServingQueries, boolean 
isServingQueries) {
     super(maxStarTreePreprocessConcurrency, 
maxStarTreePreprocessConcurrencyBeforeServingQueries, isServingQueries,
-        ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD, 
ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT,
         LOGGER);
   }
 
@@ -62,4 +61,14 @@ public class SegmentStarTreePreprocessThrottler extends 
BaseSegmentOperationsThr
         
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES);
     LOGGER.info("Updated SegmentStarTreePreprocessThrottler configs with 
latest clusterConfigs");
   }
+
+  @Override
+  public void updateThresholdMetric(int value) {
+    
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD,
 value);
+  }
+
+  @Override
+  public void updateCountMetric(int value) {
+    
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT,
 value);
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
index 7c2f626ff7..02e82df143 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
@@ -86,7 +86,7 @@ public class TableDedupMetadataManagerFactoryTest {
       TableDataManager tableDataManager, boolean expected)
       throws IOException {
     try (TableDedupMetadataManager tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(
-        instanceDedupConfig, tableConfig, schema, tableDataManager)) {
+        instanceDedupConfig, tableConfig, schema, tableDataManager, null)) {
       assertEquals(tableDedupMetadataManager.getContext().isPreloadEnabled(), 
expected);
     }
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
index 34d49ec1f2..a3ca692e0a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
@@ -119,7 +119,8 @@ public class MutableSegmentDedupTest implements 
PinotBuffersAfterMethodCheckRule
         .build();
     TableDataManager tableDataManager = mock(TableDataManager.class);
     when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR);
-    return TableDedupMetadataManagerFactory.create(new PinotConfiguration(), 
tableConfig, schema, tableDataManager);
+    return TableDedupMetadataManagerFactory.create(new PinotConfiguration(), 
tableConfig, schema, tableDataManager,
+        null);
   }
 
   public List<Map<String, String>> loadJsonFile(String filePath)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 378afbb6cd..1588de8f91 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -87,7 +87,8 @@ public class MutableSegmentImplUpsertComparisonColTest 
implements PinotBuffersAf
     _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
     TableUpsertMetadataManager tableUpsertMetadataManager =
-        TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), 
_tableConfig, _schema, _tableDataManager);
+        TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), 
_tableConfig, _schema, _tableDataManager,
+            null);
     _partitionUpsertMetadataManager = 
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl = 
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, true, 
"secondsSinceEpoch",
         _partitionUpsertMetadataManager, null);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a2ac9d6c09..b72ef4679b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -82,7 +82,7 @@ public class MutableSegmentImplUpsertTest {
     File jsonFile = new File(dataResourceUrl.getFile());
     TableUpsertMetadataManager tableUpsertMetadataManager =
         TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), 
tableConfig, schema,
-            mock(TableDataManager.class));
+            mock(TableDataManager.class), null);
     _partitionUpsertMetadataManager = 
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, true, 
TIME_COLUMN, _partitionUpsertMetadataManager,
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
index eb7c77ade9..4dfb248502 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
@@ -65,11 +65,11 @@ public class TableUpsertMetadataManagerFactoryTest {
     when(tableDataManager.getTableDataDir()).thenReturn(new 
File(RAW_TABLE_NAME));
     TableUpsertMetadataManager tableUpsertMetadataManager =
         TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), 
createTableConfig(upsertConfig), SCHEMA,
-            tableDataManager);
+            tableDataManager, null);
     assertNotNull(tableUpsertMetadataManager);
     assertTrue(tableUpsertMetadataManager instanceof 
ConcurrentMapTableUpsertMetadataManager);
-    assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(
-        0) instanceof ConcurrentMapPartitionUpsertMetadataManager);
+    assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0)
+        instanceof ConcurrentMapPartitionUpsertMetadataManager);
   }
 
   @Test
@@ -82,11 +82,11 @@ public class TableUpsertMetadataManagerFactoryTest {
     when(tableDataManager.getTableDataDir()).thenReturn(new 
File(RAW_TABLE_NAME));
     TableUpsertMetadataManager tableUpsertMetadataManager =
         TableUpsertMetadataManagerFactory.create(new PinotConfiguration(), 
createTableConfig(upsertConfig), SCHEMA,
-            tableDataManager);
+            tableDataManager, null);
     assertNotNull(tableUpsertMetadataManager);
     assertTrue(tableUpsertMetadataManager instanceof 
ConcurrentMapTableUpsertMetadataManager);
-    assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(
-        0) instanceof 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes);
+    assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0)
+        instanceof 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes);
   }
 
   @SuppressWarnings("deprecation")
@@ -139,7 +139,7 @@ public class TableUpsertMetadataManagerFactoryTest {
       TableDataManager tableDataManager, boolean expected)
       throws IOException {
     try (TableUpsertMetadataManager tableUpsertMetadataManager = 
TableUpsertMetadataManagerFactory.create(
-        instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager)) {
+        instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager, null)) {
       assertEquals(tableUpsertMetadataManager.getContext().isPreloadEnabled(), 
expected);
     }
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index a40414d25c..76d4a3a39b 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -644,38 +644,17 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     ServerSegmentCompletionProtocolHandler.init(
         
_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
 
-    int maxPreprocessConcurrency = Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
-            Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
-    int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
-            
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
-    // Relax throttling until the server is ready to serve queries
-    SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
-        new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency, 
maxPreprocessConcurrencyBeforeServingQueries,
-            false);
-    int maxStarTreePreprocessConcurrency = Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
-            Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
-    int maxStarTreePreprocessConcurrencyBeforeServingQueries = 
Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
-            
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
-    // Relax throttling until the server is ready to serve queries
-    SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
-        new 
SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
-            maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
-    int maxDownloadConcurrency = Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
-            Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
-    int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
-        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
-            
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
-    // Relax throttling until the server is ready to serve queries
-    SegmentDownloadThrottler segmentDownloadThrottler =
-        new SegmentDownloadThrottler(maxDownloadConcurrency, 
maxDownloadConcurrencyBeforeServingQueries, false);
-    _segmentOperationsThrottler =
-        new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, 
segmentStarTreePreprocessThrottler,
-            segmentDownloadThrottler);
+    if (_segmentOperationsThrottler == null) {
+      // Only create segment operation throttlers if null
+      SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
+          createSegmentAllIndexPreprocessThrottler();
+      SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
+          createSegmentStarTreePreprocessThrottler();
+      SegmentDownloadThrottler segmentDownloadThrottler = 
createSegmentDownloadThrottler();
+      _segmentOperationsThrottler =
+          new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, 
segmentStarTreePreprocessThrottler,
+              segmentDownloadThrottler);
+    }
 
     SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
     ServerConf serverConf = new ServerConf(_serverConf);
@@ -829,6 +808,42 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     }
   }
 
+  protected SegmentAllIndexPreprocessThrottler 
createSegmentAllIndexPreprocessThrottler() {
+    int maxPreprocessConcurrency = Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
+            Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
+    int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
+            
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
+    // Relax throttling until the server is ready to serve queries
+    return new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency,
+        maxPreprocessConcurrencyBeforeServingQueries, false);
+  }
+
+  protected SegmentStarTreePreprocessThrottler 
createSegmentStarTreePreprocessThrottler() {
+    int maxStarTreePreprocessConcurrency = Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
+            Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
+    int maxStarTreePreprocessConcurrencyBeforeServingQueries = 
Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
+            
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
+    // Relax throttling until the server is ready to serve queries
+    return new 
SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
+        maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
+  }
+
+  protected SegmentDownloadThrottler createSegmentDownloadThrottler() {
+    int maxDownloadConcurrency = Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
+            Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
+    int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
+        
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
+            
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
+    // Relax throttling until the server is ready to serve queries
+    return new SegmentDownloadThrottler(maxDownloadConcurrency, 
maxDownloadConcurrencyBeforeServingQueries,
+        false);
+  }
+
   /**
    * Can be overridden to perform operations before server starts serving 
queries.
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 86b8200dfc..137eaa9149 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -268,6 +268,7 @@ public class CommonConstants {
     // Setting the before serving queries to Integer.MAX_VALUE to effectively 
disable throttling by default
     public static final String 
DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
         String.valueOf(Integer.MAX_VALUE);
+
     // Preprocess throttle config specifically for StarTree index rebuild
     public static final String 
CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM =
         "pinot.server.max.segment.startree.preprocess.parallelism";
@@ -278,6 +279,8 @@ public class CommonConstants {
     // Setting the before serving queries to Integer.MAX_VALUE to effectively 
disable throttling by default
     public static final String 
DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
         String.valueOf(Integer.MAX_VALUE);
+
+    // Download throttle config
     public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM =
         "pinot.server.max.segment.download.parallelism";
     // Setting to Integer.MAX_VALUE to effectively disable throttling by 
default


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to