This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d47ea27b4ee Make query routing strategy pluggable (#17364)
d47ea27b4ee is described below

commit d47ea27b4ee724ea4b52aa07db2ef22cfd5071fd
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Dec 15 17:29:35 2025 -0800

    Make query routing strategy pluggable (#17364)
---
 .../pinot/broker/routing/BrokerRoutingManager.java |   5 +-
 .../instanceselector/BalancedInstanceSelector.java |  13 +-
 .../instanceselector/BaseInstanceSelector.java     |  42 +++---
 .../routing/instanceselector/InstanceSelector.java |  11 +-
 .../instanceselector/InstanceSelectorFactory.java  |  86 ++++++++---
 .../MultiStageReplicaGroupSelector.java            |  18 ++-
 .../ReplicaGroupInstanceSelector.java              | 130 +++++++++++++++--
 .../StrictReplicaGroupInstanceSelector.java        | 129 +----------------
 .../instanceselector/InstanceSelectorTest.java     | 157 +++++++++++----------
 .../MultiStageReplicaGroupSelectorTest.java        |  34 +++--
 .../pinot/spi/config/table/RoutingConfig.java      |   3 +-
 11 files changed, 342 insertions(+), 286 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 3bc06585e44..fa1d66c1a63 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -709,9 +709,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
           
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
       InstanceSelector instanceSelector =
           InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
-              adaptiveServerSelector, _pinotConfig);
-      instanceSelector.init(_routableServers, _enabledServerInstanceMap, 
idealState, externalView,
-          preSelectedOnlineSegments);
+              adaptiveServerSelector, _pinotConfig, _routableServers, 
_enabledServerInstanceMap, idealState,
+              externalView, preSelectedOnlineSegments);
 
       // Add time boundary manager if both offline and real-time part exist 
for a hybrid table
       TimeBoundaryManager timeBoundaryManager = null;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 4afd3a26dcd..63e80f9e3e7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -18,15 +18,10 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
-import java.time.Clock;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -49,14 +44,8 @@ import org.apache.pinot.common.utils.HashUtil;
 ///
 public class BalancedInstanceSelector extends BaseInstanceSelector {
 
-  public BalancedInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      InstanceSelectorConfig config) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, config);
-  }
-
   @Override
-  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, 
int requestId,
+  public Pair<Map<String, String>, Map<String, String>> select(List<String> 
segments, int requestId,
       SegmentStates segmentStates, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     // No need to adjust this map per total segment numbers, as optional 
segments should be empty most of the time.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 5acd67f25b5..45f95f7e41b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -81,22 +82,23 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID
 /// creation time more than 5 minutes ago).
 /// TODO: refresh new/old segment state where there is no update from helix 
for long time.
 ///
-abstract class BaseInstanceSelector implements InstanceSelector {
+public abstract class BaseInstanceSelector implements InstanceSelector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseInstanceSelector.class);
   // To prevent int overflow, reset the request id once it reaches this value
   private static final long MAX_REQUEST_ID = 1_000_000_000;
 
-  final String _tableNameWithType;
-  final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  final BrokerMetrics _brokerMetrics;
-  final AdaptiveServerSelector _adaptiveServerSelector;
+  protected TableConfig _tableConfig;
+  protected String _tableNameWithType;
+  protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  protected BrokerMetrics _brokerMetrics;
+  protected AdaptiveServerSelector _adaptiveServerSelector;
   // Will be null if and only if adaptiveServerSelector is null
-  final PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
-  final Clock _clock;
-  final InstanceSelectorConfig _config;
-  final long _newSegmentExpirationTimeInSeconds;
-  final boolean _emitSinglePoolSegmentsMetric;
-  final int _tableNameHashForFixedReplicaRouting;
+  protected PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
+  protected Clock _clock;
+  protected InstanceSelectorConfig _config;
+  protected long _newSegmentExpirationTimeInSeconds;
+  protected boolean _emitSinglePoolSegmentsMetric;
+  protected int _tableNameHashForFixedReplicaRouting;
 
   // These 3 variables are the cached states to help accelerate the change 
processing
   Set<String> _enabledInstances;
@@ -109,10 +111,13 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   private volatile SegmentStates _segmentStates;
   private Map<String, ServerInstance> _enabledServerStore;
 
-  BaseInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> 
propertyStore,
       BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      InstanceSelectorConfig config) {
-    _tableNameWithType = tableNameWithType;
+      InstanceSelectorConfig config, Set<String> enabledInstances, Map<String, 
ServerInstance> enabledServerMap,
+      IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments) {
+    _tableConfig = tableConfig;
+    _tableNameWithType = tableConfig.getTableName();
     _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
@@ -124,7 +129,7 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     // instance
     // Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF 
to get a positive value
     _tableNameHashForFixedReplicaRouting =
-        TableNameBuilder.extractRawTableName(tableNameWithType).hashCode() & 
0x7FFFFFFF;
+        TableNameBuilder.extractRawTableName(_tableNameWithType).hashCode() & 
0x7FFFFFFF;
 
     _priorityPoolInstanceSelector =
         _adaptiveServerSelector == null ? null : new 
PriorityPoolInstanceSelector(_adaptiveServerSelector);
@@ -132,11 +137,6 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
       throw new IllegalArgumentException(
           "AdaptiveServerSelector and consistent routing cannot be enabled at 
the same time");
     }
-  }
-
-  @Override
-  public void init(Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap, IdealState idealState,
-      ExternalView externalView, Set<String> onlineSegments) {
     _enabledInstances = enabledInstances;
     _enabledServerStore = enabledServerMap;
     Map<String, Long> newSegmentCreationTimeMap =
@@ -494,6 +494,6 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
    * segments are used to get the new segments that are not online yet. 
Instead of simply skipping them by broker at
    * routing time, we can send them to servers and let servers decide how to 
handle them.
    */
-  abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/> 
select(List<String> segments,
+  protected abstract Pair<Map<String, String>, Map<String, String>/*optional 
segments*/> select(List<String> segments,
       int requestId, SegmentStates segmentStates, Map<String, String> 
queryOptions);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index c1eb1cf77e5..25a54cd7fd7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -18,16 +18,23 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import java.time.Clock;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
 
 
 /**
@@ -49,7 +56,9 @@ public interface InstanceSelector {
    * (segments with ONLINE/CONSUMING instances in the ideal state and 
pre-selected by the {@link SegmentPreSelector}).
    * Should be called only once before calling other methods.
    */
-  void init(Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap,
+  void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      InstanceSelectorConfig config, Set<String> enabledInstances, Map<String, 
ServerInstance> enabledServerMap,
       IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments);
 
   /**
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index f49de38ac57..49536b93a8f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -20,15 +20,21 @@ package org.apache.pinot.broker.routing.instanceselector;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
+import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,20 +51,27 @@ public class InstanceSelectorFactory {
 
   @VisibleForTesting
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics, PinotConfiguration brokerConfig) {
-    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC(), brokerConfig);
+      ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics, PinotConfiguration brokerConfig,
+      Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap, IdealState idealState,
+      ExternalView externalView, Set<String> onlineSegments) {
+    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC(), brokerConfig,
+        enabledInstances, enabledServerMap, idealState, externalView, 
onlineSegments);
   }
 
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector, 
PinotConfiguration brokerConfig) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, 
PinotConfiguration brokerConfig,
+      Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap, IdealState idealState,
+      ExternalView externalView, Set<String> onlineSegments) {
     return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
adaptiveServerSelector, Clock.systemUTC(),
-        brokerConfig);
+        brokerConfig, enabledInstances, enabledServerMap, idealState, 
externalView, onlineSegments);
   }
 
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, 
PinotConfiguration brokerConfig) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, 
PinotConfiguration brokerConfig,
+      Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap, IdealState idealState,
+      ExternalView externalView, Set<String> onlineSegments) {
     String tableNameWithType = tableConfig.getTableName();
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
     boolean useFixedReplica = 
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
@@ -74,29 +87,58 @@ public class InstanceSelectorFactory {
             CommonConstants.Broker.DEFAULT_ENABLE_SINGLE_POOL_SEGMENTS_METRIC);
     InstanceSelectorConfig config = new 
InstanceSelectorConfig(useFixedReplica, newSegmentExpirationTimeInSeconds,
             emitSinglePoolSegmentsMetric);
+
+    InstanceSelector instanceSelector = null;
     if (routingConfig != null) {
-      if 
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
-          || (tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
+      if ((tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
           routingConfig.getRoutingTableBuilderName())) || 
(tableConfig.getTableType() == TableType.REALTIME
           && 
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
 {
         LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
-        return new ReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics, adaptiveServerSelector,
-            clock, config);
-      }
-      if 
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
-          routingConfig.getInstanceSelectorType())) {
-        LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
-        return new StrictReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock, config);
+        instanceSelector = new ReplicaGroupInstanceSelector();
       }
-      if 
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
-          routingConfig.getInstanceSelectorType())) {
-        LOGGER.info("Using {} for table: {}", 
routingConfig.getInstanceSelectorType(), tableNameWithType);
-        return new MultiStageReplicaGroupSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock, config);
+
+      if (routingConfig.getInstanceSelectorType() != null) {
+        switch (routingConfig.getInstanceSelectorType()) {
+          case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: {
+            LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
+            instanceSelector = new ReplicaGroupInstanceSelector();
+            break;
+          }
+          case RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: {
+            LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: 
{}", tableNameWithType);
+            instanceSelector = new StrictReplicaGroupInstanceSelector();
+            break;
+          }
+          case RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE: {
+            LOGGER.info("Using {} for table: {}", 
routingConfig.getInstanceSelectorType(), tableNameWithType);
+            instanceSelector = new MultiStageReplicaGroupSelector();
+            break;
+          }
+          case RoutingConfig.BALANCED_INSTANCE_SELECTOR_TYPE: {
+            LOGGER.info("Using BalancedInstanceSelector for table: {}", 
tableNameWithType);
+            instanceSelector = new BalancedInstanceSelector();
+            break;
+          }
+          default: {
+            // Try to load custom instance selector
+            try {
+              instanceSelector = 
PluginManager.get().createInstance(routingConfig.getInstanceSelectorType());
+              break;
+            } catch (Exception e) {
+              LOGGER.error("Failed to create instance selector for table: {}", 
tableNameWithType, e);
+              throw new RuntimeException(e);
+            }
+          }
+        }
       }
     }
-    return new BalancedInstanceSelector(tableNameWithType, propertyStore, 
brokerMetrics, adaptiveServerSelector, clock,
-       config);
+
+    if (instanceSelector == null) {
+      instanceSelector = new BalancedInstanceSelector();
+    }
+
+    instanceSelector.init(tableConfig, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock,
+        config, enabledInstances, enabledServerMap, idealState, externalView, 
onlineSegments);
+    return instanceSelector;
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 83cad54e44a..7c8a74f8afc 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -62,16 +63,13 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
 
   private volatile InstancePartitions _instancePartitions;
 
-  public MultiStageReplicaGroupSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      InstanceSelectorConfig config) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, config);
-  }
-
   @Override
-  public void init(Set<String> enabledInstances, Map<String, ServerInstance> 
enabledServerMap, IdealState idealState,
-      ExternalView externalView, Set<String> onlineSegments) {
-    super.init(enabledInstances, enabledServerMap, idealState, externalView, 
onlineSegments);
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      InstanceSelectorConfig config, Set<String> enabledInstances, Map<String, 
ServerInstance> enabledServerMap,
+      IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments) {
+    super.init(tableConfig, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, config, enabledInstances,
+        enabledServerMap, idealState, externalView, onlineSegments);
     _instancePartitions = getInstancePartitions();
   }
 
@@ -88,7 +86,7 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
   }
 
   @Override
-  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, 
int requestId,
+  public Pair<Map<String, String>, Map<String, String>> select(List<String> 
segments, int requestId,
       SegmentStates segmentStates, Map<String, String> queryOptions) {
     // Create a copy of InstancePartitions to avoid race-condition with 
event-listeners above.
     InstancePartitions instancePartitions = _instancePartitions;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 88793eac7cb..ab14a4d9bbc 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -18,23 +18,26 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
-import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -65,14 +68,10 @@ import 
org.apache.pinot.common.utils.config.QueryOptionsUtils;
  */
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
-  public ReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      InstanceSelectorConfig config) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, config);
-  }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaGroupInstanceSelector.class);
 
   @Override
-  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, 
int requestId,
+  public Pair<Map<String, String>, Map<String, String>> select(List<String> 
segments, int requestId,
       SegmentStates segmentStates, Map<String, String> queryOptions) {
     ServerSelectionContext ctx = new ServerSelectionContext(queryOptions, 
_config);
     if (_adaptiveServerSelector != null) {
@@ -166,4 +165,115 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
     }
     return new ArrayList<>(candidateServers.values());
   }
+
+  @Override
+  void updateSegmentMaps(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
+      Map<String, Long> newSegmentCreationTimeMap) {
+    if (_tableConfig.isUpsertEnabled() || _tableConfig.isDedupEnabled()) {
+      updateSegmentMapsForUpsertTable(idealState, externalView, 
onlineSegments, newSegmentCreationTimeMap);
+    } else {
+      super.updateSegmentMaps(idealState, externalView, onlineSegments, 
newSegmentCreationTimeMap);
+    }
+  }
+
+  /**
+   *
+   * <pre>
+   * Instances unavailable for any old segment should not exist in 
_oldSegmentCandidatesMap or _newSegmentStateMap for
+   * segments with the same instances in ideal state.
+   *
+   * The maps are calculated in the following steps to meet the strict 
replica-group guarantee:
+   *   1. Compute the online instances for both old and new segments
+   *   2. Compare online instances for old segments with instances in ideal 
state and gather the unavailable instances
+   *   for each set of instances
+   *   3. Exclude the unavailable instances from the online instances map for 
both old and new segment map
+   * </pre>
+   */
+  void updateSegmentMapsForUpsertTable(IdealState idealState, ExternalView 
externalView, Set<String> onlineSegments,
+      Map<String, Long> newSegmentCreationTimeMap) {
+    _oldSegmentCandidatesMap.clear();
+    int newSegmentMapCapacity = 
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
+    _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
+
+    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
+    Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
+
+    // Get the online instances for the segments
+    Map<String, Set<String>> oldSegmentToOnlineInstancesMap =
+        new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
+    Map<String, Set<String>> newSegmentToOnlineInstancesMap = new 
HashMap<>(newSegmentMapCapacity);
+    for (String segment : onlineSegments) {
+      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      assert idealStateInstanceStateMap != null;
+      Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
+      Set<String> onlineInstances;
+      if (externalViewInstanceStateMap == null) {
+        onlineInstances = Collections.emptySet();
+      } else {
+        onlineInstances = getOnlineInstances(idealStateInstanceStateMap, 
externalViewInstanceStateMap);
+      }
+      if (newSegmentCreationTimeMap.containsKey(segment)) {
+        newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
+      } else {
+        oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
+      }
+    }
+
+    // Calculate the unavailable instances based on the old segments' online 
instances for each combination of instances
+    // in the ideal state
+    Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> onlineInstances = entry.getValue();
+      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
+      Set<String> unavailableInstances =
+          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> 
new HashSet<>());
+      for (String instance : instancesInIdealState) {
+        if (!onlineInstances.contains(instance)) {
+          if (unavailableInstances.add(instance)) {
+            LOGGER.warn(
+                "Found unavailable instance: {} in instance group: {} for 
segment: {}, table: {} (IS: {}, EV: {})",
+                instance, instancesInIdealState, segment, _tableNameWithType, 
idealStateInstanceStateMap,
+                externalViewAssignment.get(segment));
+          }
+        }
+      }
+    }
+
+    // Iterate over the maps and exclude the unavailable instances
+    for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
+      Set<String> onlineInstances = entry.getValue();
+      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Set<String> unavailableInstances = 
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
+      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
+      for (String instance : onlineInstances) {
+        if (!unavailableInstances.contains(instance)) {
+          candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance)));
+        }
+      }
+      _oldSegmentCandidatesMap.put(segment, candidates);
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
newSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> onlineInstances = entry.getValue();
+      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
+      Set<String> unavailableInstances =
+          
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
+      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
+      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+        if (!unavailableInstances.contains(instance)) {
+          candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance), getPool(instance)));
+        }
+      }
+      _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: 
{}", _newSegmentStateMap.keySet(),
+          _oldSegmentCandidatesMap.keySet());
+    }
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 0cb56c5be95..04e3b25b359 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -18,25 +18,10 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.utils.HashUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 /**
  * Instance selector for strict replica-group routing strategy.
@@ -45,9 +30,10 @@ import org.slf4j.LoggerFactory;
  * The strict replica-group routing strategy always routes the query to the 
instances within the same replica-group.
  * (Note that the replica-group information is derived from the ideal state of 
the table, where the instances are sorted
  * alphabetically in the instance state map, so the replica-groups in the 
instance selector might not match the
- * replica-groups in the instance partitions.) The instances in a 
replica-group should have all the online segments
- * (segments with ONLINE/CONSUMING instances in the ideal state and selected 
by the pre-selector) available
- * (ONLINE/CONSUMING in the external view) in order to serve queries. If any 
segment is unavailable in the
+ * replica-groups in the instance partitions). The goal of this algorithm is 
to ensure that segments from the same
+ * partition are never served from multiple different instances. The instances 
in a replica-group should have all the
+ * online segments (segments with ONLINE/CONSUMING instances in the ideal 
state and selected by the pre-selector)
+ * available (ONLINE/CONSUMING in the external view) in order to serve 
queries. If any segment is unavailable in the
  * replica-group, we mark the whole replica-group down and not serve queries 
with this replica-group.
  *
  * The selection algorithm is the same as {@link 
ReplicaGroupInstanceSelector}, and will always evenly distribute the
@@ -68,115 +54,10 @@ import org.slf4j.LoggerFactory;
  * </pre>
  */
 public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
-
-  public StrictReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
-      InstanceSelectorConfig config) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, config);
-  }
 
-  /**
-   * {@inheritDoc}
-   *
-   * <pre>
-   * Instances unavailable for any old segment should not exist in 
_oldSegmentCandidatesMap or _newSegmentStateMap for
-   * segments with the same instances in ideal state.
-   *
-   * The maps are calculated in the following steps to meet the strict 
replica-group guarantee:
-   *   1. Compute the online instances for both old and new segments
-   *   2. Compare online instances for old segments with instances in ideal 
state and gather the unavailable instances
-   *   for each set of instances
-   *   3. Exclude the unavailable instances from the online instances map for 
both old and new segment map
-   * </pre>
-   */
   @Override
   void updateSegmentMaps(IdealState idealState, ExternalView externalView, 
Set<String> onlineSegments,
       Map<String, Long> newSegmentCreationTimeMap) {
-    _oldSegmentCandidatesMap.clear();
-    int newSegmentMapCapacity = 
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
-    _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
-
-    Map<String, Map<String, String>> idealStateAssignment = 
idealState.getRecord().getMapFields();
-    Map<String, Map<String, String>> externalViewAssignment = 
externalView.getRecord().getMapFields();
-
-    // Get the online instances for the segments
-    Map<String, Set<String>> oldSegmentToOnlineInstancesMap =
-        new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
-    Map<String, Set<String>> newSegmentToOnlineInstancesMap = new 
HashMap<>(newSegmentMapCapacity);
-    for (String segment : onlineSegments) {
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      assert idealStateInstanceStateMap != null;
-      Map<String, String> externalViewInstanceStateMap = 
externalViewAssignment.get(segment);
-      Set<String> onlineInstances;
-      if (externalViewInstanceStateMap == null) {
-        onlineInstances = Collections.emptySet();
-      } else {
-        onlineInstances = getOnlineInstances(idealStateInstanceStateMap, 
externalViewInstanceStateMap);
-      }
-      if (newSegmentCreationTimeMap.containsKey(segment)) {
-        newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
-      } else {
-        oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
-      }
-    }
-
-    // Calculate the unavailable instances based on the old segments' online 
instances for each combination of instances
-    // in the ideal state
-    Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
-    for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
-      Set<String> unavailableInstances =
-          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> 
new HashSet<>());
-      for (String instance : instancesInIdealState) {
-        if (!onlineInstances.contains(instance)) {
-          if (unavailableInstances.add(instance)) {
-            LOGGER.warn(
-                "Found unavailable instance: {} in instance group: {} for 
segment: {}, table: {} (IS: {}, EV: {})",
-                instance, instancesInIdealState, segment, _tableNameWithType, 
idealStateInstanceStateMap,
-                externalViewAssignment.get(segment));
-          }
-        }
-      }
-    }
-
-    // Iterate over the maps and exclude the unavailable instances
-    for (Map.Entry<String, Set<String>> entry : 
oldSegmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
-      Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> unavailableInstances = 
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
-      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(onlineInstances.size());
-      for (String instance : onlineInstances) {
-        if (!unavailableInstances.contains(instance)) {
-          candidates.add(new SegmentInstanceCandidate(instance, true, 
getPool(instance)));
-        }
-      }
-      _oldSegmentCandidatesMap.put(segment, candidates);
-    }
-
-    for (Map.Entry<String, Set<String>> entry : 
newSegmentToOnlineInstancesMap.entrySet()) {
-      String segment = entry.getKey();
-      Set<String> onlineInstances = entry.getValue();
-      Map<String, String> idealStateInstanceStateMap = 
idealStateAssignment.get(segment);
-      Set<String> unavailableInstances =
-          
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), 
Collections.emptySet());
-      List<SegmentInstanceCandidate> candidates = new 
ArrayList<>(idealStateInstanceStateMap.size());
-      for (String instance : 
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
-        if (!unavailableInstances.contains(instance)) {
-          candidates.add(new SegmentInstanceCandidate(instance, 
onlineInstances.contains(instance),
-              getPool(instance)));
-        }
-      }
-      _newSegmentStateMap.put(segment, new 
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
-    }
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: 
{}", _newSegmentStateMap.keySet(),
-          _oldSegmentCandidatesMap.keySet());
-    }
+    super.updateSegmentMapsForUpsertTable(idealState, externalView, 
onlineSegments, newSegmentCreationTimeMap);
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 2ec94332fc3..668332e0f68 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -158,11 +158,13 @@ public class InstanceSelectorTest {
         STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
   }
 
-  private InstanceSelector createTestInstanceSelector(String selectorType) {
+  private InstanceSelector createTestInstanceSelector(String selectorType, 
Set<String> enabledInstances,
+      IdealState idealState, ExternalView externalView, Set<String> 
onlineSegments) {
     RoutingConfig config = new RoutingConfig(null, null, selectorType, false);
     when(_tableConfig.getRoutingConfig()).thenReturn(config);
     return InstanceSelectorFactory.getInstanceSelector(_tableConfig, 
_propertyStore, _brokerMetrics, null,
-        _mutableClock, new PinotConfiguration());
+        _mutableClock, new PinotConfiguration(), enabledInstances, 
EMPTY_SERVER_MAP, idealState, externalView,
+        onlineSegments);
   }
 
   @DataProvider(name = "selectorType")
@@ -196,37 +198,49 @@ public class InstanceSelectorTest {
     when(tableConfig.getTableName()).thenReturn("testTable_OFFLINE");
 
     // Routing config is missing
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof BalancedInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof BalancedInstanceSelector);
 
     // Instance selector type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof BalancedInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof BalancedInstanceSelector);
 
     // Replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof ReplicaGroupInstanceSelector);
 
     // Strict replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof 
StrictReplicaGroupInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof StrictReplicaGroupInstanceSelector);
 
     // Should be backward-compatible with legacy config
     when(routingConfig.getInstanceSelectorType()).thenReturn(null);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof ReplicaGroupInstanceSelector);
     when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
-        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(
+        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics, new PinotConfiguration(),
+            Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new 
ExternalView("testTable_OFFLINE"),
+            Set.of()) instanceof ReplicaGroupInstanceSelector);
   }
 
   @Test
@@ -234,15 +248,9 @@ public class InstanceSelectorTest {
     String offlineTableName = "testTable_OFFLINE";
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
-    BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
-    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
-    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
-        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    BalancedInstanceSelector balancedInstanceSelector = new 
BalancedInstanceSelector();
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new 
ReplicaGroupInstanceSelector();
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector = 
new StrictReplicaGroupInstanceSelector();
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -306,10 +314,12 @@ public class InstanceSelectorTest {
     onlineSegments.add(segment3);
     List<String> segments = Arrays.asList(segment0, segment1, segment2, 
segment3);
 
-    balancedInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
-    replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
-    strictReplicaGroupInstanceSelector.init(enabledInstances, 
EMPTY_SERVER_MAP, idealState, externalView,
-        onlineSegments);
+    balancedInstanceSelector.init(_tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    replicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    strictReplicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
 
     int requestId = 0;
 
@@ -762,9 +772,7 @@ public class InstanceSelectorTest {
     when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
-    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new 
ReplicaGroupInstanceSelector();
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -799,7 +807,8 @@ public class InstanceSelectorTest {
       onlineSegments.add(segment);
     }
 
-    replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    replicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
     //   ReplicaGroupInstanceSelector
     //     segment0 -> instance0
     //     segment2 -> instance0
@@ -845,9 +854,7 @@ public class InstanceSelectorTest {
     when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
-    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new 
ReplicaGroupInstanceSelector();
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -883,7 +890,8 @@ public class InstanceSelectorTest {
       onlineSegments.add(segment);
     }
 
-    replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    replicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
     //   ReplicaGroupInstanceSelector
     //     segment0 -> instance0
     //     segment3 -> instance0
@@ -928,9 +936,7 @@ public class InstanceSelectorTest {
     when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
-    ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new 
ReplicaGroupInstanceSelector();
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -966,7 +972,8 @@ public class InstanceSelectorTest {
       onlineSegments.add(segment);
     }
 
-    replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    replicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
     // since numReplicaGroupsToQuery is not set, first query should go to 
first replica group,
     // 2nd query should go to next replica group
 
@@ -989,13 +996,9 @@ public class InstanceSelectorTest {
     String offlineTableName = "testTable_OFFLINE";
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
-    BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    BalancedInstanceSelector balancedInstanceSelector = new 
BalancedInstanceSelector();
     // ReplicaGroupInstanceSelector has the same behavior as 
BalancedInstanceSelector for the unavailable segments
-    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
-        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector = 
new StrictReplicaGroupInstanceSelector();
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -1036,9 +1039,10 @@ public class InstanceSelectorTest {
     //     (disabled) errorInstance: ERROR
     //   }
     // }
-    balancedInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
-    strictReplicaGroupInstanceSelector.init(enabledInstances, 
EMPTY_SERVER_MAP, idealState, externalView,
-        onlineSegments);
+    balancedInstanceSelector.init(_tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
+    strictReplicaGroupInstanceSelector.init(_tableConfig, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
     BrokerRequest brokerRequest = mock(BrokerRequest.class);
     PinotQuery pinotQuery = mock(PinotQuery.class);
     when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
@@ -1310,9 +1314,8 @@ public class InstanceSelectorTest {
             List.of(Pair.of(instance1, ONLINE)));
 
     ExternalView externalView = createExternalView(externalViewMap);
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     {
       int requestId = 0;
@@ -1356,7 +1359,8 @@ public class InstanceSelectorTest {
     // Advance the clock to make newSeg to old segment.
     _mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS 
+ 10));
     // Upon re-initialization, newly old segments can only be served from 
online instances: instance1
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    selector.init(_tableConfig, _propertyStore, _brokerMetrics, null, 
_mutableClock,
+        INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP, 
idealState, externalView, onlineSegments);
     {
       int requestId = 0;
       InstanceSelector.SelectionResult selectionResult =
@@ -1423,8 +1427,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
     // We don't mark segment as unavailable.
     int requestId = 0;
     Map<String, String> expectedResult = Map.of(oldSeg, instance0);
@@ -1436,7 +1440,7 @@ public class InstanceSelectorTest {
 
     // Advance the clock to make newSeg to old segment and we see newSeg is 
reported as unavailable segment.
     _mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS 
+ 10));
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    selector = createTestInstanceSelector(selectorType, enabledInstances, 
idealState, externalView, onlineSegments);
     selectionResult = selector.select(_brokerRequest, 
Lists.newArrayList(onlineSegments), requestId);
     if (STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equals(selectorType)) {
       expectedResult = Map.of();
@@ -1483,8 +1487,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // We don't mark segment as unavailable.
     int requestId = 0;
@@ -1564,8 +1568,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // We don't mark segment as unavailable.
     int requestId = 0;
@@ -1633,8 +1637,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // Add a new segment to ideal state with missing external view.
     String newSeg = "segment2";
@@ -1719,8 +1723,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // First selection, we select instance1 for newSeg.
     int requestId = 0;
@@ -1791,8 +1795,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // No selection because the external view is not in ideal state.
     int requestId = 0;
@@ -1837,8 +1841,8 @@ public class InstanceSelectorTest {
 
     ExternalView externalView = createExternalView(externalViewMap);
 
-    InstanceSelector selector = createTestInstanceSelector(selectorType);
-    selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState, 
externalView, onlineSegments);
+    InstanceSelector selector =
+        createTestInstanceSelector(selectorType, enabledInstances, idealState, 
externalView, onlineSegments);
 
     // We don't mark segment as unavailable.
     int requestId = 0;
@@ -1853,13 +1857,10 @@ public class InstanceSelectorTest {
   @Test
   public void testReplicaGroupAdaptiveServerSelector() {
     // Arrange
-    String offlineTableName = "testTable_OFFLINE";
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     HybridSelector hybridSelector = mock(HybridSelector.class);
-    ReplicaGroupInstanceSelector instanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, hybridSelector,
-            Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+    ReplicaGroupInstanceSelector instanceSelector = new 
ReplicaGroupInstanceSelector();
 
     // Define instances and segments
     String instance0 = "instance0";
@@ -1884,6 +1885,20 @@ public class InstanceSelectorTest {
     instanceCandidatesMap.put(segment2,
         Arrays.asList(new SegmentInstanceCandidate(instance4, true), new 
SegmentInstanceCandidate(instance3, true)));
 
+    IdealState idealState = createIdealState(
+        Map.of(segment0, List.of(Pair.of(instance0, ONLINE), 
Pair.of(instance1, ONLINE)), segment1,
+            List.of(Pair.of(instance2, ONLINE), Pair.of(instance3, ONLINE)), 
segment2,
+            List.of(Pair.of(instance3, ONLINE), Pair.of(instance4, ONLINE))));
+
+    ExternalView externalView = createExternalView(
+        Map.of(segment0, List.of(Pair.of(instance0, ONLINE), 
Pair.of(instance1, ONLINE)), segment1,
+            List.of(Pair.of(instance2, ONLINE), Pair.of(instance3, ONLINE)), 
segment2,
+            List.of(Pair.of(instance3, ONLINE), Pair.of(instance4, ONLINE))));
+
+    instanceSelector.init(_tableConfig, propertyStore, brokerMetrics, 
hybridSelector, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, Set.of(instance0, instance1, instance2, 
instance3, instance4), EMPTY_SERVER_MAP,
+        idealState, externalView, new HashSet<>(segments));
+
     // Define the segment states
     SegmentStates segmentStates = new SegmentStates(instanceCandidatesMap, new 
HashSet<>(segments), null);
 
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
index cad8f3fa524..3ee4159eb6b 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
@@ -59,7 +60,10 @@ public class MultiStageReplicaGroupSelectorTest {
   private final static List<String> SEGMENTS =
       Arrays.asList("segment0", "segment1", "segment2", "segment3", 
"segment4", "segment5", "segment6", "segment7",
           "segment8", "segment9", "segment10", "segment11");
-  private static final Map<String, ServerInstance> EMPTY_SERVER_MAP = 
Collections.EMPTY_MAP;
+
+  private static final Map<String, ServerInstance> EMPTY_SERVER_MAP = 
Collections.emptyMap();
+
+  private static final InstanceSelectorConfig INSTANCE_SELECTOR_CONFIG = new 
InstanceSelectorConfig(false, 300, false);
   private AutoCloseable _mocks;
   @Mock
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -69,6 +73,8 @@ public class MultiStageReplicaGroupSelectorTest {
   private BrokerRequest _brokerRequest;
   @Mock
   private PinotQuery _pinotQuery;
+  @Mock
+  private TableConfig _tableConfig;
 
   private static List<String> getSegments() {
     return SEGMENTS;
@@ -83,6 +89,7 @@ public class MultiStageReplicaGroupSelectorTest {
     _mocks = MockitoAnnotations.openMocks(this);
     when(_brokerRequest.getPinotQuery()).thenReturn(_pinotQuery);
     when(_pinotQuery.getQueryOptions()).thenReturn(null);
+    when(_tableConfig.getTableName()).thenReturn(TABLE_NAME);
   }
 
   @AfterMethod
@@ -104,7 +111,8 @@ public class MultiStageReplicaGroupSelectorTest {
     Set<String> onlineSegments = new HashSet<>();
 
     setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
 
     // Using requestId=0 should select replica-group 0. Even segments get 
assigned to instance-0 and odd segments get
@@ -167,12 +175,14 @@ public class MultiStageReplicaGroupSelectorTest {
     Set<String> onlineSegments = new HashSet<>();
 
     setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
 
     // If instance-0 is down, replica-group 1 should be picked even with 
requestId=0
     enabledInstances.remove("instance-0");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
     Map<String, String> expectedSelectorResult = 
createExpectedAssignment(replicaGroup1, getSegments());
     InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -180,7 +190,8 @@ public class MultiStageReplicaGroupSelectorTest {
 
     // If instance-2 also goes down, no replica-group is eligible
     enabledInstances.remove("instance-2");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
     try {
       multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -216,7 +227,8 @@ public class MultiStageReplicaGroupSelectorTest {
     externalView.getRecord().getMapFields().get("segment1").put("instance-1", 
"ERROR");
     externalView.getRecord().getMapFields().get("segment2").put("instance-2", 
"ERROR");
 
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
 
     // Even though instance-0 and instance-3 belong to different replica 
groups, they handle exclusive sets of segments
@@ -235,7 +247,8 @@ public class MultiStageReplicaGroupSelectorTest {
     // If instance-3 has an error segment as well, there is no replica group 
available to serve complete set of
     // segments.
     externalView.getRecord().getMapFields().get("segment3").put("instance-3", 
"ERROR");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
     try {
       multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -265,7 +278,8 @@ public class MultiStageReplicaGroupSelectorTest {
     Set<String> onlineSegments = new HashSet<>();
 
     setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+    multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics, 
null, Clock.systemUTC(),
+        INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances), 
EMPTY_SERVER_MAP, idealState, externalView,
         onlineSegments);
 
     InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -279,9 +293,7 @@ public class MultiStageReplicaGroupSelectorTest {
   }
 
   private MultiStageReplicaGroupSelector 
createMultiStageSelector(InstancePartitions instancePartitions) {
-    MultiStageReplicaGroupSelector multiStageSelector =
-        new MultiStageReplicaGroupSelector(TABLE_NAME, _propertyStore, 
_brokerMetrics, null, Clock.systemUTC(),
-            new InstanceSelectorConfig(false, 300, false));
+    MultiStageReplicaGroupSelector multiStageSelector = new 
MultiStageReplicaGroupSelector();
     multiStageSelector = spy(multiStageSelector);
     
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
     return multiStageSelector;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 65b1b00a48e..b3cde3ea82b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -29,10 +29,11 @@ public class RoutingConfig extends BaseJsonConfig {
   public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
   public static final String TIME_SEGMENT_PRUNER_TYPE = "time";
   public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
-  public static final String DEFAULT_INSTANCE_SELECTOR_TYPE = "balanced";
+  public static final String BALANCED_INSTANCE_SELECTOR_TYPE = "balanced";
   public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"replicaGroup";
   public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"strictReplicaGroup";
   public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE = 
"multiStageReplicaGroup";
+  public static final String DEFAULT_INSTANCE_SELECTOR_TYPE = 
BALANCED_INSTANCE_SELECTOR_TYPE;
 
   // Replaced by _segmentPrunerTypes and _instanceSelectorType
   @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to