This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 413b7cb  Add StrictReplicaGroupInstanceSelector (#6208)
413b7cb is described below

commit 413b7cb294a59628a2233c6c593295f817c33455
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sun Nov 1 01:54:00 2020 -0700

    Add StrictReplicaGroupInstanceSelector (#6208)
    
    Recently we have introduced some features which require all the segments 
from the same partition to be processed by the same server. For example, upsert 
(#4261) feature requires all the segments for a partition loaded on a single 
server before starting serving queries to ensure result correctness. The Strict 
Replica-Group Routing is designed to meet this requirement.
    
    This PR is the routing side change of the Strict Replica-Group Routing, 
which is handled in the StrictReplicaGroupInstanceSelector class.
    The new algorithm relies on the ideal state of the table, so this PR added 
the ideal state to the routing interfaces.
---
 .../pinot/broker/routing/RoutingManager.java       |  90 ++--
 .../instanceselector/BaseInstanceSelector.java     |  87 ++--
 .../routing/instanceselector/InstanceSelector.java |  18 +-
 .../instanceselector/InstanceSelectorFactory.java  |  22 +-
 .../StrictReplicaGroupInstanceSelector.java        | 181 ++++++++
 .../SegmentLineageBasedSegmentPreSelector.java     |   3 +-
 .../SegmentPreSelector.java                        |   6 +-
 .../SegmentPreSelectorFactory.java                 |   2 +-
 .../segmentpruner/PartitionSegmentPruner.java      |   6 +-
 .../routing/segmentpruner/SegmentPruner.java       |  18 +-
 .../segmentselector/OfflineSegmentSelector.java    |   7 +-
 .../segmentselector/RealtimeSegmentSelector.java   |   7 +-
 .../routing/segmentselector/SegmentSelector.java   |  14 +-
 .../routing/timeboundary/TimeBoundaryManager.java  |  27 +-
 .../instanceselector/InstanceSelectorTest.java     | 473 ++++++++++++++++-----
 .../SegmentPreSelectorTest.java                    |   4 +-
 .../routing/segmentpruner/SegmentPrunerTest.java   |  11 +-
 .../segmentselector/SegmentSelectorTest.java       |  11 +-
 .../timeboundary/TimeBoundaryManagerTest.java      |  43 +-
 .../apache/pinot/core/util/TableConfigUtils.java   |   6 +-
 .../pinot/core/util/TableConfigUtilsTest.java      |  26 +-
 .../pinot/spi/config/table/RoutingConfig.java      |   4 +-
 .../upsert_meetupRsvp_realtime_table_config.json   |   2 +-
 23 files changed, 798 insertions(+), 270 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index 940e40c..e80ca3d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -35,15 +35,16 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
 import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
 import 
org.apache.pinot.broker.routing.instanceselector.InstanceSelectorFactory;
+import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import 
org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
-import org.apache.pinot.broker.routing.segmentselector.SegmentPreSelector;
-import 
org.apache.pinot.broker.routing.segmentselector.SegmentPreSelectorFactory;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
@@ -154,13 +155,13 @@ public class RoutingManager implements 
ClusterChangeHandler {
                   tableNameWithType);
               continue;
             }
-            Set<String> onlineSegments = getOnlineSegments(tableNameWithType);
-            if (onlineSegments == null) {
+            IdealState idealState = getIdealState(tableNameWithType);
+            if (idealState == null) {
               LOGGER
                   .warn("Failed to find ideal state for table: {}, skipping 
updating routing entry", tableNameWithType);
               continue;
             }
-            routingEntry.onExternalViewChange(externalView, onlineSegments);
+            routingEntry.onExternalViewChange(externalView, idealState);
           } catch (Exception e) {
             LOGGER
                 .error("Caught unexpected exception while updating routing 
entry on external view change for table: {}",
@@ -190,19 +191,12 @@ public class RoutingManager implements 
ClusterChangeHandler {
   }
 
   @Nullable
-  private Set<String> getOnlineSegments(String tableNameWithType) {
-    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + 
tableNameWithType, null, AccessOption.PERSISTENT);
+  private IdealState getIdealState(String tableNameWithType) {
+    Stat stat = new Stat();
+    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + 
tableNameWithType, stat, AccessOption.PERSISTENT);
     if (znRecord != null) {
-      Map<String, Map<String, String>> segmentAssignment = 
znRecord.getMapFields();
-      Set<String> onlineSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
-      for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
-        Map<String, String> instanceStateMap = entry.getValue();
-        if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
instanceStateMap
-            .containsValue(SegmentStateModel.CONSUMING)) {
-          onlineSegments.add(entry.getKey());
-        }
-      }
-      return onlineSegments;
+      znRecord.setVersion(stat.getVersion());
+      return new IdealState(znRecord);
     } else {
       return null;
     }
@@ -212,9 +206,9 @@ public class RoutingManager implements ClusterChangeHandler 
{
     LOGGER.info("Processing instance config change");
     long startTimeMs = System.currentTimeMillis();
 
-    List<ZNRecord> instanceConfigZNRecords =
-        _zkDataAccessor.getChildren(_instanceConfigsPath, null, 
AccessOption.PERSISTENT,
-            CommonConstants.Helix.ZkClient.RETRY_COUNT, 
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+    List<ZNRecord> instanceConfigZNRecords = _zkDataAccessor
+        .getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT, 
CommonConstants.Helix.ZkClient.RETRY_COUNT,
+            CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
 
     // Calculate new enabled and disabled instances
@@ -284,6 +278,7 @@ public class RoutingManager implements ClusterChangeHandler 
{
     if 
("true".equals(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS)))
 {
       return false;
     }
+    //noinspection RedundantIfStatement
     if 
("true".equals(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)))
 {
       return false;
     }
@@ -310,22 +305,22 @@ public class RoutingManager implements 
ClusterChangeHandler {
       externalViewVersion = externalView.getRecord().getVersion();
     }
 
-    Set<String> onlineSegments = getOnlineSegments(tableNameWithType);
-    Preconditions.checkState(onlineSegments != null, "Failed to find ideal 
state for table: %s", tableNameWithType);
+    IdealState idealState = getIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
 
-    Set<String> enabledInstances = _enabledServerInstanceMap.keySet();
+    Set<String> onlineSegments = getOnlineSegments(idealState);
 
     SegmentPreSelector segmentPreSelector =
         SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
     Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
     SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
-    segmentSelector.init(externalView, preSelectedOnlineSegments);
+    segmentSelector.init(externalView, idealState, preSelectedOnlineSegments);
     List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
     for (SegmentPruner segmentPruner : segmentPruners) {
-      segmentPruner.init(externalView, preSelectedOnlineSegments);
+      segmentPruner.init(externalView, idealState, preSelectedOnlineSegments);
     }
     InstanceSelector instanceSelector = 
InstanceSelectorFactory.getInstanceSelector(tableConfig, _brokerMetrics);
-    instanceSelector.init(enabledInstances, externalView, 
preSelectedOnlineSegments);
+    instanceSelector.init(_enabledServerInstanceMap.keySet(), externalView, 
idealState, preSelectedOnlineSegments);
 
     // Add time boundary manager if both offline and real-time part exist for 
a hybrid table
     TimeBoundaryManager timeBoundaryManager = null;
@@ -336,7 +331,7 @@ public class RoutingManager implements ClusterChangeHandler 
{
       if (_routingEntryMap.containsKey(realtimeTableName)) {
         LOGGER.info("Adding time boundary manager for table: {}", 
tableNameWithType);
         timeBoundaryManager = new TimeBoundaryManager(tableConfig, 
_propertyStore);
-        timeBoundaryManager.init(externalView, onlineSegments);
+        timeBoundaryManager.init(externalView, idealState, 
preSelectedOnlineSegments);
       }
     } else {
       // Current table is real-time
@@ -355,12 +350,18 @@ public class RoutingManager implements 
ClusterChangeHandler {
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
-        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableName);
-        Preconditions.checkState(offlineTableOnlineSegments != null, "Failed 
to find ideal state for table: %s",
-            offlineTableName);
+        IdealState offlineTableIdealState = getIdealState(offlineTableName);
+        Preconditions
+            .checkState(offlineTableIdealState != null, "Failed to find ideal 
state for table: %s", offlineTableName);
+        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
+        SegmentPreSelector offlineTableSegmentPreSelector =
+            
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
+        Set<String> offlineTablePreSelectedOnlineSegments =
+            
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
         TimeBoundaryManager offlineTableTimeBoundaryManager =
             new TimeBoundaryManager(offlineTableConfig, _propertyStore);
-        offlineTableTimeBoundaryManager.init(offlineTableExternalView, 
offlineTableOnlineSegments);
+        offlineTableTimeBoundaryManager
+            .init(offlineTableExternalView, offlineTableIdealState, 
offlineTablePreSelectedOnlineSegments);
         
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
       }
     }
@@ -379,6 +380,22 @@ public class RoutingManager implements 
ClusterChangeHandler {
   }
 
   /**
+   * Returns the online segments (with ONLINE/CONSUMING instances) in the 
given ideal state.
+   */
+  private static Set<String> getOnlineSegments(IdealState idealState) {
+    Map<String, Map<String, String>> segmentAssignment = 
idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new 
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || 
instanceStateMap
+          .containsValue(SegmentStateModel.CONSUMING)) {
+        onlineSegments.add(entry.getKey());
+      }
+    }
+    return onlineSegments;
+  }
+
+  /**
    * Removes the routing for the given table.
    */
   public synchronized void removeRouting(String tableNameWithType) {
@@ -525,15 +542,16 @@ public class RoutingManager implements 
ClusterChangeHandler {
     // NOTE: The change gets applied in sequence, and before change applied to 
all components, there could be some
     // inconsistency between components, which is fine because the 
inconsistency only exists for the newly changed
     // segments and only lasts for a very short time.
-    void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments) {
+    void onExternalViewChange(ExternalView externalView, IdealState 
idealState) {
+      Set<String> onlineSegments = getOnlineSegments(idealState);
       Set<String> preSelectedOnlineSegments = 
_segmentPreSelector.preSelect(onlineSegments);
-      _segmentSelector.onExternalViewChange(externalView, 
preSelectedOnlineSegments);
+      _segmentSelector.onExternalViewChange(externalView, idealState, 
preSelectedOnlineSegments);
       for (SegmentPruner segmentPruner : _segmentPruners) {
-        segmentPruner.onExternalViewChange(externalView, 
preSelectedOnlineSegments);
+        segmentPruner.onExternalViewChange(externalView, idealState, 
preSelectedOnlineSegments);
       }
-      _instanceSelector.onExternalViewChange(externalView, 
preSelectedOnlineSegments);
+      _instanceSelector.onExternalViewChange(externalView, idealState, 
preSelectedOnlineSegments);
       if (_timeBoundaryManager != null) {
-        _timeBoundaryManager.onExternalViewChange(externalView, 
preSelectedOnlineSegments);
+        _timeBoundaryManager.onExternalViewChange(externalView, idealState, 
preSelectedOnlineSegments);
       }
       _lastUpdateExternalViewVersion = externalView.getStat().getVersion();
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 512f721..7fb8076 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
@@ -68,9 +71,10 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   }
 
   @Override
-  public void init(Set<String> enabledInstances, ExternalView externalView, 
Set<String> onlineSegments) {
+  public void init(Set<String> enabledInstances, ExternalView externalView, 
IdealState idealState,
+      Set<String> onlineSegments) {
     _enabledInstances = enabledInstances;
-    onExternalViewChange(externalView, onlineSegments);
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   /**
@@ -126,47 +130,28 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
    * {@inheritDoc}
    *
    * <p>Updates the cached maps ({@code segmentToOnlineInstancesMap}, {@code 
segmentToOfflineInstancesMap} and
-   * {@code instanceToSegmentsMap}) based on the given ExternalView and 
re-calculates
-   * {@code segmentToEnabledInstancesMap} and {@code unavailableSegments} 
based on the cached states.
+   * {@code instanceToSegmentsMap}) and re-calculates {@code 
segmentToEnabledInstancesMap} and
+   * {@code unavailableSegments} based on the cached states.
    */
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments) {
-    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
-    int numSegments = segmentAssignment.size();
-    _segmentToOnlineInstancesMap = new 
HashMap<>(HashUtil.getHashMapCapacity(numSegments));
-    _segmentToOfflineInstancesMap = new 
HashMap<>(HashUtil.getHashMapCapacity(numSegments));
+  public void onExternalViewChange(ExternalView externalView, IdealState 
idealState, Set<String> onlineSegments) {
+    int numSegments = onlineSegments.size();
+    int segmentMapCapacity = HashUtil.getHashMapCapacity(numSegments);
+    _segmentToOnlineInstancesMap = new HashMap<>(segmentMapCapacity);
+    _segmentToOfflineInstancesMap = new HashMap<>(segmentMapCapacity);
     if (_instanceToSegmentsMap != null) {
       _instanceToSegmentsMap = new 
HashMap<>(HashUtil.getHashMapCapacity(_instanceToSegmentsMap.size()));
     } else {
       _instanceToSegmentsMap = new HashMap<>();
     }
 
-    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
-      String segment = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      // NOTE: Instances will be sorted here because 'instanceStateMap' is a 
TreeMap
-      List<String> onlineInstances = new ArrayList<>(instanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>(instanceStateMap.size());
-      _segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      _segmentToOfflineInstancesMap.put(segment, offlineInstances);
-      for (Map.Entry<String, String> instanceStateEntry : 
instanceStateMap.entrySet()) {
-        String instance = instanceStateEntry.getKey();
-        String state = instanceStateEntry.getValue();
-        // Do not track instances in ERROR state
-        if (!state.equals(SegmentStateModel.ERROR)) {
-          _instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
-          if (state.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
-            onlineInstances.add(instance);
-          }
-        }
-      }
-    }
+    // Update the cached maps
+    updateSegmentMaps(externalView, idealState, onlineSegments, 
_segmentToOnlineInstancesMap,
+        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
 
     // Generate a new map from segment to enabled ONLINE/CONSUMING instances 
and a new set of unavailable segments (no
     // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap = new 
HashMap<>(HashUtil.getHashMapCapacity(numSegments));
+    Map<String, List<String>> segmentToEnabledInstancesMap = new 
HashMap<>(segmentMapCapacity);
     Set<String> unavailableSegments = new HashSet<>();
     // NOTE: Put null as the value when there is no enabled instances for a 
segment so that segmentToEnabledInstancesMap
     // always contains all segments. With this, in onInstancesChange() we can 
directly iterate over
@@ -183,6 +168,44 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   }
 
   /**
+   * Updates the segment maps based on the given external view, ideal state 
and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector).
+   */
+  void updateSegmentMaps(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
List<String>> segmentToOfflineInstancesMap,
+      Map<String, List<String>> instanceToSegmentsMap) {
+    // Iterate over the external view instead of the online segments so that 
the map lookups are performed on the
+    // HashSet instead of the TreeSet for performance
+    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      Map<String, String> instanceStateMap = entry.getValue();
+      List<String> onlineInstances = new ArrayList<>(instanceStateMap.size());
+      List<String> offlineInstances = new ArrayList<>();
+      segmentToOnlineInstancesMap.put(segment, onlineInstances);
+      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      for (Map.Entry<String, String> instanceStateEntry : 
instanceStateMap.entrySet()) {
+        String instance = instanceStateEntry.getKey();
+        String state = instanceStateEntry.getValue();
+        // Do not track instances in ERROR state
+        if (!state.equals(SegmentStateModel.ERROR)) {
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
+          if (state.equals(SegmentStateModel.OFFLINE)) {
+            offlineInstances.add(instance);
+          } else {
+            onlineInstances.add(instance);
+          }
+        }
+      }
+      // Sort the online instances for replica-group routing to work. For 
multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will 
always point to the same instance.
+      onlineInstances.sort(null);
+    }
+  }
+
+  /**
    * Calculates the enabled ONLINE/CONSUMING instances for the given segment, 
and updates the unavailable segments (no
    * enabled instance or all enabled instances are in ERROR state).
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4619ada..47b777a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -31,12 +32,11 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface InstanceSelector {
 
   /**
-   * Initializes the instance selector with the enabled instances, external 
view and online segments (segments with
-   * ONLINE/CONSUMING instances in ideal state). Should be called only once 
before calling other methods.
-   * <p>NOTE: {@code onlineSegments} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * Initializes the instance selector with the enabled instances, external 
view, ideal state and online segments
+   * (segments with ONLINE/CONSUMING instances in the ideal state and selected 
by the pre-selector). Should be called
+   * only once before calling other methods.
    */
-  void init(Set<String> enabledInstances, ExternalView externalView, 
Set<String> onlineSegments);
+  void init(Set<String> enabledInstances, ExternalView externalView, 
IdealState idealState, Set<String> onlineSegments);
 
   /**
    * Processes the instances change. Changed instances are pre-computed based 
on the current and previous enabled
@@ -45,12 +45,10 @@ public interface InstanceSelector {
   void onInstancesChange(Set<String> enabledInstances, List<String> 
changedInstances);
 
   /**
-   * Processes the external view change based on the given online segments 
(segments with ONLINE/CONSUMING instances in
-   * ideal state).
-   * <p>NOTE: {@code onlineSegments} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * Processes the external view change based on the given ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments);
 
   /**
    * Selects the server instances for the given segments queried by the given 
broker request, returns a map from segment
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index a951dae..8410c46 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -38,14 +38,20 @@ public class InstanceSelectorFactory {
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig, 
BrokerMetrics brokerMetrics) {
     String tableNameWithType = tableConfig.getTableName();
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
-    if (routingConfig != null && (
-        
RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
-            || (tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING
-            .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())) || (
-            tableConfig.getTableType() == TableType.REALTIME && 
LEGACY_REPLICA_GROUP_REALTIME_ROUTING
-                
.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))) {
-      LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
-      return new ReplicaGroupInstanceSelector(tableNameWithType, 
brokerMetrics);
+    if (routingConfig != null) {
+      if 
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
+          || (tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING
+          .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())) || (
+          tableConfig.getTableType() == TableType.REALTIME && 
LEGACY_REPLICA_GROUP_REALTIME_ROUTING
+              .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName()))) {
+        LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
+        return new ReplicaGroupInstanceSelector(tableNameWithType, 
brokerMetrics);
+      }
+      if (RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+          .equalsIgnoreCase(routingConfig.getInstanceSelectorType())) {
+        LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
+        return new StrictReplicaGroupInstanceSelector(tableNameWithType, 
brokerMetrics);
+      }
     }
     return new BalancedInstanceSelector(tableNameWithType, brokerMetrics);
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
new file mode 100644
index 0000000..a368c79
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.common.utils.HashUtil;
+
+
+/**
+ * Instance selector for strict replica-group routing strategy.
+ *
+ * <pre>
+ * The strict replica-group routing strategy always routes the query to the 
instances within the same replica-group.
+ * (Note that the replica-group information is derived from the ideal state of 
the table, where the instances are sorted
+ * alphabetically in the instance state map, so the replica-groups in the 
instance selector might not match the
+ * replica-groups in the instance partitions.) The instances in a 
replica-group should have all the online segments
+ * (segments with ONLINE/CONSUMING instances in the ideal state and selected 
by the pre-selector) available
+ * (ONLINE/CONSUMING in the external view) in order to serve queries. If any 
segment is unavailable in the
+ * replica-group, we mark the whole replica-group down and not serve queries 
with this replica-group.
+ *
+ * The selection algorithm is the same as {@link 
ReplicaGroupInstanceSelector}, and will always evenly distribute the
+ * traffic to all replica-groups that have all online segments available.
+ *
+ * The algorithm relies on the mirror segment assignment from replica-group 
segment assignment strategy. With mirror
+ * segment assignment, any server in one replica-group will always have a 
corresponding server in other replica-groups
+ * that have the same segments assigned. For example, if S1 is a server in 
replica-group 1, and it has mirror server
+ * S2 in replica-group 2 and S3 in replica-group 3. All segments assigned to 
S1 will also be assigned to S2 and S3. In
+ * stable scenario (external view matches ideal state), all segments assigned 
to S1 will have the same enabled instances
+ * of [S1, S2, S3] sorted (in alphabetical order). If we always pick the same 
index of enabled instances for all
+ * segments, only one of S1, S2, S3 will be picked, and all the segments are 
processed by the same server. In
+ * transitioning/error scenario (external view does not match ideal state), if 
a segment is down on S1, we mark all
+ * segments with the same assignment ([S1, S2, S3]) down on S1 to ensure that 
we always route the segments to the same
+ * replica-group.
+ * </pre>
+ */
+public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSelector {
+
+  public StrictReplicaGroupInstanceSelector(String tableNameWithType, 
BrokerMetrics brokerMetrics) {
+    super(tableNameWithType, brokerMetrics);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <pre>
+   * The maps are calculated in the following steps to meet the strict 
replica-group guarantee:
+   *   1. Create a map from online segment to set of instances hosting the 
segment based on the ideal state
+   *   2. Gather the online and offline instances for each online segment from 
the external view
+   *   3. Compare the instances from the ideal state and the external view and 
gather the unavailable instances for each
+   *      set of instances
+   *   4. Exclude the unavailable instances from the online instances map
+   * </pre>
+   */
+  @Override
+  void updateSegmentMaps(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, 
List<String>> segmentToOfflineInstancesMap,
+      Map<String, List<String>> instanceToSegmentsMap) {
+    // Iterate over the ideal state to fill up 
'idealStateSegmentToInstancesMap' which is a map from segment to set of
+    // instances hosting the segment in the ideal state
+    int segmentMapCapacity = 
HashUtil.getHashMapCapacity(onlineSegments.size());
+    Map<String, Set<String>> idealStateSegmentToInstancesMap = new 
HashMap<>(segmentMapCapacity);
+    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      // Only track online segments
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      idealStateSegmentToInstancesMap.put(segment, entry.getValue().keySet());
+    }
+
+    // Iterate over the external view to fill up 
'tempSegmentToOnlineInstancesMap' and 'segmentToOfflineInstancesMap'.
+    // 'tempSegmentToOnlineInstancesMap' is a temporary map from segment to 
set of instances that are in the ideal state
+    // and also ONLINE/CONSUMING in the external view. This map does not have 
the strict replica-group guarantee, and
+    // will be used to calculate the final 'segmentToOnlineInstancesMap'.
+    Map<String, Set<String>> tempSegmentToOnlineInstancesMap = new 
HashMap<>(segmentMapCapacity);
+    for (Map.Entry<String, Map<String, String>> entry : 
externalView.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      Set<String> instancesInIdealState = 
idealStateSegmentToInstancesMap.get(segment);
+      // Only track online segments
+      if (instancesInIdealState == null) {
+        continue;
+      }
+      Map<String, String> instanceStateMap = entry.getValue();
+      Set<String> tempOnlineInstances = new TreeSet<>();
+      List<String> offlineInstances = new ArrayList<>();
+      tempSegmentToOnlineInstancesMap.put(segment, tempOnlineInstances);
+      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      for (Map.Entry<String, String> instanceStateEntry : 
instanceStateMap.entrySet()) {
+        String instance = instanceStateEntry.getKey();
+        // Only track instances within the ideal state
+        if (!instancesInIdealState.contains(instance)) {
+          continue;
+        }
+        String state = instanceStateEntry.getValue();
+        if (state.equals(SegmentStateModel.ONLINE) || 
state.equals(SegmentStateModel.CONSUMING)) {
+          tempOnlineInstances.add(instance);
+        } else if (state.equals(SegmentStateModel.OFFLINE)) {
+          offlineInstances.add(instance);
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
+        }
+      }
+    }
+
+    // Iterate over the 'tempSegmentToOnlineInstancesMap' to gather the 
unavailable instances for each set of instances
+    Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
tempSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> tempOnlineInstances = entry.getValue();
+      Set<String> instancesInIdealState = 
idealStateSegmentToInstancesMap.get(segment);
+      // NOTE: When a segment is unavailable on all the instances, do not 
count all the instances as unavailable because
+      //       this segment is unavailable and won't be included in the 
routing table, thus not breaking the requirement
+      //       of routing to the same replica-group. This is normal for new 
added segments, and we don't want to mark
+      //       all instances down on all segments with the same assignment.
+      if (tempOnlineInstances.size() == instancesInIdealState.size() || 
tempOnlineInstances.isEmpty()) {
+        continue;
+      }
+      Set<String> unavailableInstances =
+          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> 
new TreeSet<>());
+      for (String instance : instancesInIdealState) {
+        if (!tempOnlineInstances.contains(instance)) {
+          unavailableInstances.add(instance);
+        }
+      }
+    }
+
+    // Iterate over the 'tempSegmentToOnlineInstancesMap' again to fill up the 
'segmentToOnlineInstancesMap' which has
+    // the strict replica-group guarantee
+    for (Map.Entry<String, Set<String>> entry : 
tempSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> tempOnlineInstances = entry.getValue();
+      // NOTE: Instances will be sorted here because 'tempOnlineInstances' is 
a TreeSet. We need the online instances to
+      //       be sorted for replica-group routing to work. For multiple 
segments with the same online instances, if the
+      //       list is sorted, the same index in the list will always point to 
the same instance.
+      List<String> onlineInstances = new 
ArrayList<>(tempOnlineInstances.size());
+      segmentToOnlineInstancesMap.put(segment, onlineInstances);
+
+      Set<String> instancesInIdealState = 
idealStateSegmentToInstancesMap.get(segment);
+      Set<String> unavailableInstances = 
unavailableInstancesMap.get(instancesInIdealState);
+      if (unavailableInstances == null) {
+        // No unavailable instance, add all instances as online instance
+        for (String instance : tempOnlineInstances) {
+          onlineInstances.add(instance);
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
+        }
+      } else {
+        // Some instances are unavailable, add the remaining instances as 
online instance
+        for (String instance : tempOnlineInstances) {
+          if (!unavailableInstances.contains(instance)) {
+            onlineInstances.add(instance);
+            instanceToSegmentsMap.computeIfAbsent(instance, k -> new 
ArrayList<>()).add(segment);
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
similarity index 96%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
index 48d1c8c..f8c4c55 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
@@ -16,9 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
-import java.util.HashSet;
 import java.util.Set;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
similarity index 86%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
index 4d94c8d..bb07bdd 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import java.util.Set;
 
@@ -34,8 +34,8 @@ import java.util.Set;
 public interface SegmentPreSelector {
 
   /**
-   * Process pre-selection for online segments to filter out unnecessary 
online segments. It is safe to modify the input
-   * online segments.
+   * Pre-selects the online segments to filter out the unnecessary segments. 
This method might modify the online segment
+   * set passed in.
    */
   Set<String> preSelect(Set<String> onlineSegments);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
similarity index 95%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java
rename to 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
index 55f0e73..5385870 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
index abc7d3c..bc05bcf 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
@@ -64,7 +65,7 @@ public class PartitionSegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
+  public void init(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments) {
     // Bulk load partition info for all online segments
     int numSegments = onlineSegments.size();
     List<String> segments = new ArrayList<>(onlineSegments);
@@ -124,7 +125,8 @@ public class PartitionSegmentPruner implements 
SegmentPruner {
   }
 
   @Override
-  public synchronized void onExternalViewChange(ExternalView externalView, 
Set<String> onlineSegments) {
+  public synchronized void onExternalViewChange(ExternalView externalView, 
IdealState idealState,
+      Set<String> onlineSegments) {
     // NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
     for (String segment : onlineSegments) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
index 6197fe8..fb7afa6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.routing.segmentpruner;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -30,20 +31,17 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface SegmentPruner {
 
   /**
-   * Initializes the segment pruner with the external view and online segments 
(segments with ONLINE/CONSUMING instances
-   * in ideal state). Should be called only once before calling other methods.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * Initializes the segment pruner with the external view, ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector). Should be called only once before
+   * calling other methods.
    */
-  void init(ExternalView externalView, Set<String> onlineSegments);
+  void init(ExternalView externalView, IdealState idealState, Set<String> 
onlineSegments);
 
   /**
-   * Processes the external view change based on the given online segments 
(segments with ONLINE/CONSUMING instances in
-   * ideal state).
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * Processes the external view change based on the given ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments);
 
   /**
    * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
index e71f45a..0166e4c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -33,12 +34,12 @@ public class OfflineSegmentSelector implements 
SegmentSelector {
   private volatile List<String> _segments;
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
-    onExternalViewChange(externalView, onlineSegments);
+  public void init(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments) {
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments) {
+  public void onExternalViewChange(ExternalView externalView, IdealState 
idealState, Set<String> onlineSegments) {
     // TODO: for new added segments, before all replicas are up, consider not 
selecting them to avoid causing
     //       hotspot servers
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index a052da2..ab79c78 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.common.utils.HLCSegmentName;
@@ -55,12 +56,12 @@ public class RealtimeSegmentSelector implements 
SegmentSelector {
   private volatile List<String> _llcSegments;
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
-    onExternalViewChange(externalView, onlineSegments);
+  public void init(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments) {
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments) {
+  public void onExternalViewChange(ExternalView externalView, IdealState 
idealState, Set<String> onlineSegments) {
     // Group HLC segments by their group id
     // NOTE: Use TreeMap so that group ids are sorted and the result is 
deterministic
     Map<String, List<String>> groupIdToHLCSegmentsMap = new TreeMap<>();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
index a140ddf..3909a26 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.routing.segmentselector;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -43,16 +44,17 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface SegmentSelector {
 
   /**
-   * Initializes the segment selector with the external view and online 
segments (segments with ONLINE/CONSUMING
-   * instances in ideal state). Should be called only once before calling 
other methods.
+   * Initializes the segment selector with the external view, ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector). Should be called only once before
+   * calling other methods.
    */
-  void init(ExternalView externalView, Set<String> onlineSegments);
+  void init(ExternalView externalView, IdealState idealState, Set<String> 
onlineSegments);
 
   /**
-   * Processes the external view change based on the given online segments 
(segments with ONLINE/CONSUMING instances in
-   * ideal state).
+   * Processes the external view change based on the given ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in ideal state and selected by the 
pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> 
onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments);
 
   /**
    * Selects the segments queried by the given broker request. The segments 
selected should cover the whole dataset
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 7462913..c7a64bb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -82,21 +83,22 @@ public class TimeBoundaryManager {
 
     // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
     // (maxEndTime - 1 DAY)
-    _isHourlyTable = 
CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
-        && _timeUnit != TimeUnit.DAYS;
+    _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
+        
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) 
&& _timeUnit != TimeUnit.DAYS;
 
     LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, 
timeUnit: {}, isHourlyTable: {} for table: {}",
         _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName);
   }
 
   /**
-   * Initializes the time boundary manager with the external view and online 
segments (segments with ONLINE/CONSUMING
-   * instances in ideal state). Should be called only once before calling 
other methods.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * Initializes the time boundary manager with the external view, ideal state 
and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector). Should be called only once before
+   * calling other methods.
+   * <p>NOTE: {@code externalView} and {@code idealState} are unused, but 
intentionally passed in in case they are
+   * needed in the future.
    */
   @SuppressWarnings("unused")
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
+  public void init(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments) {
     // Bulk load time info for all online segments
     int numSegments = onlineSegments.size();
     List<String> segments = new ArrayList<>(numSegments);
@@ -161,15 +163,16 @@ public class TimeBoundaryManager {
   }
 
   /**
-   * Processes the external view change based on the given online segments 
(segments with ONLINE/CONSUMING instances in
-   * ideal state).
+   * Processes the external view change based on the given ideal state and 
online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the 
pre-selector).
    * <p>NOTE: We don't update all the segment ZK metadata for every external 
view change, but only the new added/removed
    * ones. The refreshed segment ZK metadata change won't be picked up.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as 
argument in case it is needed in the
-   * future.
+   * <p>NOTE: {@code externalView} and {@code idealState} are unused, but 
intentionally passed in in case they are
+   * needed in the future.
    */
   @SuppressWarnings("unused")
-  public synchronized void onExternalViewChange(ExternalView externalView, 
Set<String> onlineSegments) {
+  public synchronized void onExternalViewChange(ExternalView externalView, 
IdealState idealState,
+      Set<String> onlineSegments) {
     for (String segment : onlineSegments) {
       _endTimeMap.computeIfAbsent(segment, k -> 
extractEndTimeFromSegmentZKMetadataZNRecord(segment,
           _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, 
AccessOption.PERSISTENT)));
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 3806a27..79fd518 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -66,6 +67,11 @@ public class InstanceSelectorTest {
     assertTrue(InstanceSelectorFactory
         .getInstanceSelector(tableConfig, brokerMetrics) instanceof 
ReplicaGroupInstanceSelector);
 
+    // Strict replica-group instance selector should be returned
+    
when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
+    assertTrue(InstanceSelectorFactory
+        .getInstanceSelector(tableConfig, brokerMetrics) instanceof 
StrictReplicaGroupInstanceSelector);
+
     // Should be backward-compatible with legacy config
     when(routingConfig.getInstanceSelectorType()).thenReturn(null);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
@@ -87,12 +93,15 @@ public class InstanceSelectorTest {
     BalancedInstanceSelector balancedInstanceSelector = new 
BalancedInstanceSelector(offlineTableName, brokerMetrics);
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
+        new StrictReplicaGroupInstanceSelector(offlineTableName, 
brokerMetrics);
 
     Set<String> enabledInstances = new HashSet<>();
     ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
-    // NOTE: Online segments is not used in the current implementation
-    Set<String> onlineSegments = Collections.emptySet();
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
 
     // 'instance0' and 'instance1' are in the same replica-group, 'instance2' 
and 'instance3' are in the same
     // replica-group; 'instance0' and 'instance2' serve the same segments, 
'instance1' and 'instance3' serve the same
@@ -115,34 +124,51 @@ public class InstanceSelectorTest {
     // Add 2 segments to each instance
     //   [segment0, segment1] -> [instance0, instance2, errorInstance0]
     //   [segment2, segment3] -> [instance1, instance3, errorInstance1]
-    Map<String, String> instanceStateMap0 = new TreeMap<>();
-    instanceStateMap0.put(instance0, ONLINE);
-    instanceStateMap0.put(instance2, ONLINE);
-    instanceStateMap0.put(errorInstance0, ERROR);
     String segment0 = "segment0";
     String segment1 = "segment1";
-    segmentAssignment.put(segment0, instanceStateMap0);
-    segmentAssignment.put(segment1, instanceStateMap0);
-    Map<String, String> instanceStateMap1 = new TreeMap<>();
-    instanceStateMap1.put(instance1, ONLINE);
-    instanceStateMap1.put(instance3, ONLINE);
-    instanceStateMap1.put(errorInstance1, ERROR);
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance0, ONLINE);
+    externalViewInstanceStateMap0.put(instance2, ONLINE);
+    externalViewInstanceStateMap0.put(errorInstance0, ERROR);
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    idealStateInstanceStateMap0.put(instance0, ONLINE);
+    idealStateInstanceStateMap0.put(instance2, ONLINE);
+    idealStateInstanceStateMap0.put(errorInstance0, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0);
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment1);
     String segment2 = "segment2";
     String segment3 = "segment3";
-    segmentAssignment.put(segment2, instanceStateMap1);
-    segmentAssignment.put(segment3, instanceStateMap1);
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+    externalViewInstanceStateMap1.put(instance1, ONLINE);
+    externalViewInstanceStateMap1.put(instance3, ONLINE);
+    externalViewInstanceStateMap1.put(errorInstance1, ERROR);
+    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+    idealStateInstanceStateMap1.put(instance1, ONLINE);
+    idealStateInstanceStateMap1.put(instance3, ONLINE);
+    idealStateInstanceStateMap1.put(errorInstance1, ONLINE);
+    externalViewSegmentAssignment.put(segment2, externalViewInstanceStateMap1);
+    externalViewSegmentAssignment.put(segment3, externalViewInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap1);
+    onlineSegments.add(segment2);
+    onlineSegments.add(segment3);
     List<String> segments = Arrays.asList(segment0, segment1, segment2, 
segment3);
 
-    balancedInstanceSelector.init(enabledInstances, externalView, 
onlineSegments);
-    replicaGroupInstanceSelector.init(enabledInstances, externalView, 
onlineSegments);
+    balancedInstanceSelector.init(enabledInstances, externalView, idealState, 
onlineSegments);
+    replicaGroupInstanceSelector.init(enabledInstances, externalView, 
idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.init(enabledInstances, externalView, 
idealState, onlineSegments);
 
-    // For the first request:
+    // For the 1st request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance0
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance0
     //     segment1 -> instance0
     //     segment2 -> instance1
@@ -164,14 +190,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the second request:
+    // For the 2nd request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance0
     //     segment2 -> instance3
     //     segment3 -> instance1
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
@@ -192,19 +221,23 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Disable instance0
     enabledInstances.remove(instance0);
     balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
     replicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
+    strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
 
-    // For the third request:
+    // For the 3rd request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance1
@@ -225,14 +258,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the fourth request:
+    // For the 4th request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
@@ -253,22 +289,29 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Remove segment0 and add segment4
-    segmentAssignment.remove(segment0);
+    externalViewSegmentAssignment.remove(segment0);
+    idealStateSegmentAssignment.remove(segment0);
+    onlineSegments.remove(segment0);
     String segment4 = "segment4";
-    segmentAssignment.put(segment4, instanceStateMap0);
+    externalViewSegmentAssignment.put(segment4, externalViewInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0);
+    onlineSegments.add(segment4);
     segments = Arrays.asList(segment1, segment2, segment3, segment4);
 
     // Requests arrived before changes got picked up
 
-    // For the fifth request:
+    // For the 5th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> null
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -287,14 +330,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the sixth request:
+    // For the 6th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> null
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
@@ -313,18 +359,22 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Process the changes
-    balancedInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
-    replicaGroupInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
+    balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+    replicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
 
-    // For the seventy request:
+    // For the 7th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -345,14 +395,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the eighth request:
+    // For the 8th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
@@ -373,19 +426,23 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Re-enable instance0
     enabledInstances.add(instance0);
     balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
     replicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
+    strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance0));
 
-    // For the ninth request:
+    // For the 9th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance0
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance0
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -406,15 +463,100 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the tenth request:
+    // For the 10th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> instance0
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance3
+    //     segment3 -> instance3
+    //     segment4 -> instance2
+    expectedBalancedInstanceSelectorResult = new HashMap<>();
+    expectedBalancedInstanceSelectorResult.put(segment1, instance2);
+    expectedBalancedInstanceSelectorResult.put(segment2, instance1);
+    expectedBalancedInstanceSelectorResult.put(segment3, instance3);
+    expectedBalancedInstanceSelectorResult.put(segment4, instance0);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+    expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
+    expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
+    expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+
+    // segment1 run into ERROR state on instance0
+    Map<String, String> externalViewInstanceStateMap2 = new TreeMap<>();
+    externalViewInstanceStateMap2.put(instance0, ERROR);
+    externalViewInstanceStateMap2.put(instance2, ONLINE);
+    externalViewInstanceStateMap2.put(errorInstance0, ERROR);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap2);
+    balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+    replicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
+
+    // For the 11th request:
+    //   BalancedInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance3
+    //     segment3 -> instance1
+    //     segment4 -> instance2
     //   ReplicaGroupInstanceSelector:
     //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance1
+    //     segment4 -> instance0
+    //   StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance1
+    //     segment4 -> instance2
+    expectedBalancedInstanceSelectorResult = new HashMap<>();
+    expectedBalancedInstanceSelectorResult.put(segment1, instance2);
+    expectedBalancedInstanceSelectorResult.put(segment2, instance3);
+    expectedBalancedInstanceSelectorResult.put(segment3, instance1);
+    expectedBalancedInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+    expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
+    expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
+    expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    Map<String, String> expectedStrictReplicaGroupInstanceSelectorResult = new 
HashMap<>();
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment2, instance1);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment3, instance1);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedStrictReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+
+    // For the 12th request:
+    //   BalancedInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance3
+    //     segment4 -> instance0
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> instance2
@@ -434,6 +576,9 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, 
segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
   @Test
@@ -441,124 +586,260 @@ public class InstanceSelectorTest {
     String offlineTableName = "testTable_OFFLINE";
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector = new 
BalancedInstanceSelector(offlineTableName, brokerMetrics);
+    // ReplicaGroupInstanceSelector has the same behavior as 
BalancedInstanceSelector for the unavailable segments
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
+        new StrictReplicaGroupInstanceSelector(offlineTableName, 
brokerMetrics);
 
     Set<String> enabledInstances = new HashSet<>();
     ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> segmentAssignment = 
externalView.getRecord().getMapFields();
-    // NOTE: Online segments is not used in the current implementation
-    Set<String> onlineSegments = Collections.emptySet();
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
 
     String instance = "instance";
     String errorInstance = "errorInstance";
-    Map<String, String> instanceStateMap = new TreeMap<>();
-    instanceStateMap.put(instance, CONSUMING);
-    instanceStateMap.put(errorInstance, ERROR);
-    String segment = "segment";
-    segmentAssignment.put(segment, instanceStateMap);
-    List<String> segments = Collections.singletonList(segment);
-
-    // Initialize with no enabled instance, segment should be unavailable
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance, CONSUMING);
+    externalViewInstanceStateMap0.put(errorInstance, ERROR);
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+    externalViewInstanceStateMap1.put(instance, CONSUMING);
+    externalViewInstanceStateMap1.put(errorInstance, ERROR);
+    Map<String, String> idealStateInstanceStateMap = new TreeMap<>();
+    idealStateInstanceStateMap.put(instance, CONSUMING);
+    idealStateInstanceStateMap.put(errorInstance, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap);
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment1);
+    List<String> segments = Arrays.asList(segment0, segment1);
+
+    // Initialize with no enabled instance, both segments should be unavailable
     // {
-    //   (disabled) instance: CONSUMING
-    //   (disabled) errorInstance: ERROR
+    //   segment0: {
+    //     (disabled) instance: CONSUMING,
+    //     (disabled) errorInstance: ERROR
+    //   },
+    //   segment1: {
+    //     (disabled) instance: CONSUMING,
+    //     (disabled) errorInstance: ERROR
+    //   }
     // }
-    balancedInstanceSelector.init(enabledInstances, externalView, 
onlineSegments);
+    balancedInstanceSelector.init(enabledInstances, externalView, idealState, 
onlineSegments);
+    strictReplicaGroupInstanceSelector.init(enabledInstances, externalView, 
idealState, onlineSegments);
     BrokerRequest brokerRequest = mock(BrokerRequest.class);
     InstanceSelector.SelectionResult selectionResult = 
balancedInstanceSelector.select(brokerRequest, segments);
     assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-    assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment));
+    assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, 
segments);
+    assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+    assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
     // Iterate 5 times
     for (int i = 0; i < 5; i++) {
 
-      // Enable the ERROR instance, segment should be unavailable
+      // Enable the ERROR instance, both segments should be unavailable
       // {
-      //   (disabled) instance: CONSUMING
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.add(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
-      // Enable the CONSUMING instance, segment should be available
+      // Enable the CONSUMING instance, both segments should be available
       // {
-      //   (enabled)  instance: CONSUMING
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.add(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), 
Collections.singletonMap(segment, instance));
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Change the CONSUMING instance to ONLINE, segment should be available
+      // Change the CONSUMING instance to ONLINE, both segments should be 
available
       // {
-      //   (enabled)  instance: ONLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(instance, ONLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
+      externalViewInstanceStateMap0.put(instance, ONLINE);
+      externalViewInstanceStateMap1.put(instance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), 
Collections.singletonMap(segment, instance));
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Change the ONLINE instance to OFFLINE, both segment to instance map 
and unavailable segment should be empty
+      // Switch the instance state for segment1, both segments should be 
available for BalancedInstanceSelector, but
+      // unavailable for StrictReplicaGroupInstanceSelector
       // {
-      //   (enabled)  instance: OFFLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: ERROR,
+      //     (enabled)  errorInstance: ONLINE
+      //   }
       // }
-      instanceStateMap.put(instance, OFFLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
+      externalViewInstanceStateMap1.put(instance, ERROR);
+      externalViewInstanceStateMap1.put(errorInstance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+
+      // Switch back the instance state for segment1 and change the ONLINE 
instance to OFFLINE, both segment to instance
+      // map and unavailable segments should be empty
+      // {
+      //   segment0: {
+      //     (enabled)  instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
+      // }
+      externalViewInstanceStateMap0.put(instance, OFFLINE);
+      externalViewInstanceStateMap1.put(instance, OFFLINE);
+      externalViewInstanceStateMap1.put(errorInstance, ERROR);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Disable the OFFLINE instance, segment should be unavailable
+      // Disable the OFFLINE instance, both segments should be unavailable
       // {
-      //   (disabled) instance: OFFLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.remove(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(instance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
-      // Change the ERROR instance to ONLINE, segment should be available
+      // Change the ERROR instance of segment0 to ONLINE, segment0 should be 
available
+      // (Note that for StrictReplicaGroupInstanceSelector, segment1 does not 
have any ONLINE/CONSUMING instance, so it
+      // won't mark instance down for segment0)
       // {
-      //   (disabled) instance: OFFLINE
-      //   (enabled)  errorInstance: ONLINE
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ONLINE
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(errorInstance, ONLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
+      externalViewInstanceStateMap0.put(errorInstance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), 
Collections.singletonMap(segment, errorInstance));
-      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
+      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment1));
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
+      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment1));
 
-      // Disable the ONLINE instance, segment should be unavailable
+      // Disable the ONLINE instance, both segments should be unavailable
       // {
-      //   (disabled) instance: OFFLINE
-      //   (disabled) errorInstance: ONLINE
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (disabled) errorInstance: ONLINE
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (disabled) errorInstance: ERROR
+      //   }
       // }
       enabledInstances.remove(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, 
Collections.singletonList(errorInstance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+      selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
 
-      // Change back to initial state, segment should be unavailable
+      // Change back to initial state, both segments should be unavailable
       // {
-      //   (disabled) instance: CONSUMING
-      //   (disabled) errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(instance, CONSUMING);
-      instanceStateMap.put(errorInstance, ERROR);
-      balancedInstanceSelector.onExternalViewChange(externalView, 
onlineSegments);
+      externalViewInstanceStateMap0.put(instance, CONSUMING);
+      externalViewInstanceStateMap0.put(errorInstance, ERROR);
+      externalViewInstanceStateMap1.put(instance, CONSUMING);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, 
idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, 
segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), 
Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
+      selectionResult = 
strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), 
Arrays.asList(segment0, segment1));
     }
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
similarity index 97%
rename from 
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java
rename to 
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
index d3bc30a..d19def7 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,7 +49,7 @@ public class SegmentPreSelectorTest {
     Set<String> onlineSegments = new HashSet<>();
 
     int numOfflineSegments = 5;
-    for (int i = 0 ; i< numOfflineSegments; i++ ) {
+    for (int i = 0; i < numOfflineSegments; i++) {
       String segmentName = "segment_" + i;
       externalView.setStateMap(segmentName, onlineInstanceStateMap);
       onlineSegments.add(segmentName);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 06657dd..8e46a5d 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
@@ -140,12 +141,14 @@ public class SegmentPrunerTest {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
     BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_2);
     BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_3);
+    // NOTE: External view and ideal state are not used in the current 
implementation.
     ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
 
     PartitionSegmentPruner segmentPruner =
         new PartitionSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN, 
_propertyStore);
     Set<String> onlineSegments = new HashSet<>();
-    segmentPruner.init(externalView, onlineSegments);
+    segmentPruner.init(externalView, idealState, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptyList()), 
Collections.emptyList());
     assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptyList()), 
Collections.emptyList());
     assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptyList()), 
Collections.emptyList());
@@ -166,7 +169,7 @@ public class SegmentPrunerTest {
     
segmentZKMetadataWithoutPartitionMetadata.setSegmentName(segmentWithoutPartitionMetadata);
     ZKMetadataProvider
         .setOfflineSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, 
segmentZKMetadataWithoutPartitionMetadata);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singletonList(segmentWithoutPartitionMetadata)),
         Collections.singletonList(segmentWithoutPartitionMetadata));
     assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singletonList(segmentWithoutPartitionMetadata)),
@@ -183,7 +186,7 @@ public class SegmentPrunerTest {
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKMetadata(segment1, "Murmur", 4, 0);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, 
segment1)),
         Arrays.asList(segment0, segment1));
     assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, 
segment1)),
@@ -193,7 +196,7 @@ public class SegmentPrunerTest {
 
     // Update partition metadata without refreshing should have no effect
     setSegmentZKMetadata(segment0, "Modulo", 4, 1);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, 
segment1)),
         Arrays.asList(segment0, segment1));
     assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, 
segment1)),
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
index b447bb6..051f8c6 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -61,10 +62,12 @@ public class SegmentSelectorTest {
     Map<String, String> onlineInstanceStateMap = 
Collections.singletonMap("server", ONLINE);
     Map<String, String> consumingInstanceStateMap = 
Collections.singletonMap("server", CONSUMING);
     Set<String> onlineSegments = new HashSet<>();
+    // NOTE: Ideal state is not used in the current implementation.
+    IdealState idealState = mock(IdealState.class);
 
     // Should return an empty list when there is no segment
     RealtimeSegmentSelector segmentSelector = new RealtimeSegmentSelector();
-    segmentSelector.init(externalView, onlineSegments);
+    segmentSelector.init(externalView, idealState, onlineSegments);
     BrokerRequest brokerRequest = mock(BrokerRequest.class);
     assertTrue(segmentSelector.select(brokerRequest).isEmpty());
 
@@ -83,7 +86,7 @@ public class SegmentSelectorTest {
       }
       hlcSegments[i] = hlcSegmentsForGroup;
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
 
     // Only HLC segments exist, should select the HLC segments from the first 
group
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
hlcSegments[0]);
@@ -107,7 +110,7 @@ public class SegmentSelectorTest {
         }
       }
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
 
     // Both HLC and LLC segments exist, should select the LLC segments
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
expectedSelectedLLCSegments);
@@ -122,7 +125,7 @@ public class SegmentSelectorTest {
         onlineSegments.remove(hlcSegment);
       }
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, 
onlineSegments);
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), 
expectedSelectedLLCSegments);
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 98f5d3e..00ca6c7 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
@@ -72,71 +73,73 @@ public class TimeBoundaryManagerTest {
 
   @Test
   public void testTimeBoundaryManager() {
-    ExternalView externalView = Mockito.mock(ExternalView.class);
-
     for (TimeUnit timeUnit : TimeUnit.values()) {
       // Test DAILY push table, with timeFieldSpec
       String rawTableName = "testTable_" + timeUnit + "_DAILY";
       TableConfig tableConfig = getTableConfig(rawTableName, "DAILY");
       setSchemaTimeFieldSpec(rawTableName, timeUnit);
-      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test HOURLY push table, with timeFieldSpec
       rawTableName = "testTable_" + timeUnit + "_HOURLY";
       tableConfig = getTableConfig(rawTableName, "HOURLY");
       setSchemaTimeFieldSpec(rawTableName, timeUnit);
-      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test DAILY push table with dateTimeFieldSpec
       rawTableName = "testTableDateTime_" + timeUnit + "_DAILY";
       tableConfig = getTableConfig(rawTableName, "DAILY");
       setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
-      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test HOURLY push table
       rawTableName = "testTableDateTime_" + timeUnit + "_HOURLY";
       tableConfig = getTableConfig(rawTableName, "HOURLY");
       setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
-      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit);
     }
   }
 
-  private void testDailyPushTable(String rawTableName, TableConfig 
tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+  private void testDailyPushTable(String rawTableName, TableConfig 
tableConfig, TimeUnit timeUnit) {
+    // NOTE: External view and ideal state are not used in the current 
implementation.
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+
     // Start with no segment
     TimeBoundaryManager timeBoundaryManager = new 
TimeBoundaryManager(tableConfig, _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     assertNull(timeBoundaryManager.getTimeBoundaryInfo());
 
     // Add the first segment should update the time boundary
     String segment0 = "segment0";
     onlineSegments.add(segment0);
     setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(1, TimeUnit.DAYS));
 
     // Add a new segment with larger end time should update the time boundary
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, 
onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(3, TimeUnit.DAYS));
 
     // Add a new segment with smaller end time should not change the time 
boundary
     String segment2 = "segment2";
     onlineSegments.add(segment2);
     setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, 
onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(3, TimeUnit.DAYS));
 
     // Remove the segment with largest end time should update the time boundary
     onlineSegments.remove(segment1);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, 
onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(2, TimeUnit.DAYS));
 
     // Change segment ZK metadata without refreshing should not update the 
time boundary
     setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, 
onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(2, TimeUnit.DAYS));
 
     // Refresh the changed segment should update the time boundary
@@ -144,13 +147,17 @@ public class TimeBoundaryManagerTest {
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), 
timeUnit.convert(4, TimeUnit.DAYS));
   }
 
-  private void testHourlyPushTable(String rawTableName, TableConfig 
tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+  private void testHourlyPushTable(String rawTableName, TableConfig 
tableConfig, TimeUnit timeUnit) {
+    // NOTE: External view and ideal state are not used in the current 
implementation.
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+
     TimeBoundaryManager timeBoundaryManager = new 
TimeBoundaryManager(tableConfig, _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
     onlineSegments.add(segment0);
     setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     long expectedTimeValue;
     if (timeUnit == TimeUnit.DAYS) {
       // Time boundary should be endTime - 1 DAY when time unit is DAYS
@@ -168,10 +175,8 @@ public class TimeBoundaryManagerTest {
   }
 
   private void setSchemaTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
-    ZKMetadataProvider.setSchema(_propertyStore,
-        new Schema.SchemaBuilder().setSchemaName(rawTableName)
-            .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
timeUnit, TIME_COLUMN), null)
-            .build());
+    ZKMetadataProvider.setSchema(_propertyStore, new 
Schema.SchemaBuilder().setSchemaName(rawTableName)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, timeUnit, 
TIME_COLUMN), null).build());
   }
 
   private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit 
timeUnit) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index 83d752c..882f017 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -242,7 +242,7 @@ public final class TableConfigUtils {
    * Validates the upsert-related configurations
    *  - check table type is realtime
    *  - the primary key exists on the schema
-   *  - replica group is configured for routing type
+   *  - strict replica-group is configured for routing type
    *  - consumer type must be low-level
    */
   protected static void validateUpsertConfig(TableConfig tableConfig, Schema 
schema) {
@@ -265,9 +265,9 @@ public final class TableConfigUtils {
         "Upsert table must use low-level streaming consumer type");
     // replica group is configured for routing
     Preconditions.checkState(
-        tableConfig.getRoutingConfig() != null && 
RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+        tableConfig.getRoutingConfig() != null && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
             
.equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()),
-        "Upsert table must use replica-group (i.e. replicaGroup) based 
routing");
+        "Upsert table must use strict replica-group (i.e. strictReplicaGroup) 
based routing");
     // no startree index
     Preconditions.checkState(
         
CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
 && !tableConfig
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index 8594463..9ddcb9c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -159,8 +159,7 @@ public class TableConfigUtilsTest {
 
     // empty timeColumnName - valid
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
-    tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid
@@ -506,8 +505,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setInvertedIndexColumns(Arrays.asList("")).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setInvertedIndexColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -519,8 +519,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setNoDictionaryColumns(Arrays.asList("")).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -532,8 +533,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setOnHeapDictionaryColumns(Arrays.asList("")).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setOnHeapDictionaryColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -742,11 +744,13 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Upsert table must use replica-group 
(i.e. replicaGroup) based routing");
+      Assert.assertEquals(e.getMessage(),
+          "Upsert table must use strict replica-group (i.e. 
strictReplicaGroup) based routing");
     }
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setRoutingConfig(new RoutingConfig(null, null, 
"replicaGroup")).setStreamConfigs(streamConfigs).build();
+        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
     } catch (Exception e) {
@@ -756,7 +760,7 @@ public class TableConfigUtilsTest {
         .singletonList(new 
AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), 10);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setRoutingConfig(new RoutingConfig(null, null, "replicaGroup"))
+        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index ee5b564..196e98d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -20,15 +20,15 @@ package org.apache.pinot.spi.config.table;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.pinot.spi.config.BaseJsonConfig;
-
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
 public class RoutingConfig extends BaseJsonConfig {
   public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
   public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"replicaGroup";
+  public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = 
"strictReplicaGroup";
 
   // Replaced by _segmentPrunerTypes and _instanceSelectorType
   @Deprecated
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
index 596e4f8..c285cab 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
@@ -31,7 +31,7 @@
     "customConfigs": {}
   },
   "routing": {
-    "instanceSelectorType": "replicaGroup"
+    "instanceSelectorType": "strictReplicaGroup"
   },
   "upsertConfig": {
     "mode": "FULL"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to